You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:27 UTC

[13/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
new file mode 100644
index 0000000..5e200a0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable,
+ * and is able to resize without recreating new array if not necessary.
+ * <p>
+ *
+ * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field,
+ * which is the desired valid number of <tt>BytesRefWritable</tt> it holds.
+ * <tt>resetValid</tt> can reset the valid, but it will not care the underlying
+ * BytesRefWritable.
+ */
+
+public class BytesRefArrayWritable implements Writable,
+    Comparable<BytesRefArrayWritable> {
+
+  private BytesRefWritable[] bytesRefWritables = null;
+
+  private int valid = 0;
+
+  /**
+   * Constructs an empty array with the specified capacity.
+   *
+   * @param capacity
+   *          initial capacity
+   * @exception IllegalArgumentException
+   *              if the specified initial capacity is negative
+   */
+  public BytesRefArrayWritable(int capacity) {
+    if (capacity < 0) {
+      throw new IllegalArgumentException("Capacity can not be negative.");
+    }
+    bytesRefWritables = new BytesRefWritable[0];
+    ensureCapacity(capacity);
+  }
+
+  /**
+   * Constructs an empty array with a capacity of ten.
+   */
+  public BytesRefArrayWritable() {
+    this(10);
+  }
+
+  /**
+   * Returns the number of valid elements.
+   *
+   * @return the number of valid elements
+   */
+  public int size() {
+    return valid;
+  }
+
+  /**
+   * Gets the BytesRefWritable at the specified position. Make sure the position
+   * is valid by first call resetValid.
+   *
+   * @param index
+   *          the position index, starting from zero
+   * @throws IndexOutOfBoundsException
+   */
+  public BytesRefWritable get(int index) {
+    if (index >= valid) {
+      throw new IndexOutOfBoundsException(
+          "This BytesRefArrayWritable only has " + valid + " valid values.");
+    }
+    return bytesRefWritables[index];
+  }
+
+  /**
+   * Gets the BytesRefWritable at the specified position without checking.
+   *
+   * @param index
+   *          the position index, starting from zero
+   * @throws IndexOutOfBoundsException
+   */
+  public BytesRefWritable unCheckedGet(int index) {
+    return bytesRefWritables[index];
+  }
+
+  /**
+   * Set the BytesRefWritable at the specified position with the specified
+   * BytesRefWritable.
+   *
+   * @param index
+   *          index position
+   * @param bytesRefWritable
+   *          the new element
+   * @throws IllegalArgumentException
+   *           if the specified new element is null
+   */
+  public void set(int index, BytesRefWritable bytesRefWritable) {
+    if (bytesRefWritable == null) {
+      throw new IllegalArgumentException("Can not assign null.");
+    }
+    ensureCapacity(index + 1);
+    bytesRefWritables[index] = bytesRefWritable;
+    if (valid <= index) {
+      valid = index + 1;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int compareTo(BytesRefArrayWritable other) {
+    if (other == null) {
+      throw new IllegalArgumentException("Argument can not be null.");
+    }
+    if (this == other) {
+      return 0;
+    }
+    int sizeDiff = valid - other.valid;
+    if (sizeDiff != 0) {
+      return sizeDiff;
+    }
+    for (int i = 0; i < valid; i++) {
+      if (other.contains(bytesRefWritables[i])) {
+        continue;
+      } else {
+        return 1;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int hashCode(){
+    return Objects.hashCode(bytesRefWritables);
+  }
+  /**
+   * Returns <tt>true</tt> if this instance contains one or more the specified
+   * BytesRefWritable.
+   *
+   * @param bytesRefWritable
+   *          BytesRefWritable element to be tested
+   * @return <tt>true</tt> if contains the specified element
+   * @throws IllegalArgumentException
+   *           if the specified element is null
+   */
+  public boolean contains(BytesRefWritable bytesRefWritable) {
+    if (bytesRefWritable == null) {
+      throw new IllegalArgumentException("Argument can not be null.");
+    }
+    for (int i = 0; i < valid; i++) {
+      if (bytesRefWritables[i].equals(bytesRefWritable)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !(o instanceof BytesRefArrayWritable)) {
+      return false;
+    }
+    return compareTo((BytesRefArrayWritable) o) == 0;
+  }
+
+  /**
+   * Removes all elements.
+   */
+  public void clear() {
+    valid = 0;
+  }
+
+  /**
+   * enlarge the capacity if necessary, to ensure that it can hold the number of
+   * elements specified by newValidCapacity argument. It will also narrow the
+   * valid capacity when needed. Notice: it only enlarge or narrow the valid
+   * capacity with no care of the already stored invalid BytesRefWritable.
+   *
+   * @param newValidCapacity
+   *          the desired capacity
+   */
+  public void resetValid(int newValidCapacity) {
+    ensureCapacity(newValidCapacity);
+    valid = newValidCapacity;
+  }
+
+  protected void ensureCapacity(int newCapacity) {
+    int size = bytesRefWritables.length;
+    if (size < newCapacity) {
+      bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity);
+      while (size < newCapacity) {
+        bytesRefWritables[size] = new BytesRefWritable();
+        size++;
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int count = in.readInt();
+    ensureCapacity(count);
+    for (int i = 0; i < count; i++) {
+      bytesRefWritables[i].readFields(in);
+    }
+    valid = count;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(valid);
+
+    for (int i = 0; i < valid; i++) {
+      BytesRefWritable cu = bytesRefWritables[i];
+      cu.write(out);
+    }
+  }
+
+  static {
+    WritableFactories.setFactory(BytesRefArrayWritable.class,
+        new WritableFactory() {
+
+          @Override
+          public Writable newInstance() {
+            return new BytesRefArrayWritable();
+          }
+
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
new file mode 100644
index 0000000..158c740
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used
+ * to avoid unnecessary byte copy.
+ */
+public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> {
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+  public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable();
+
+  int start = 0;
+  int length = 0;
+  byte[] bytes = null;
+
+  LazyDecompressionCallback lazyDecompressObj;
+
+  /**
+   * Create a zero-size bytes.
+   */
+  public BytesRefWritable() {
+    this(EMPTY_BYTES);
+  }
+
+  /**
+   * Create a BytesRefWritable with <tt>length</tt> bytes.
+   */
+  public BytesRefWritable(int length) {
+    assert length > 0;
+    this.length = length;
+    bytes = new byte[this.length];
+    start = 0;
+  }
+
+  /**
+   * Create a BytesRefWritable referenced to the given bytes.
+   */
+  public BytesRefWritable(byte[] bytes) {
+    this.bytes = bytes;
+    length = bytes.length;
+    start = 0;
+  }
+
+  /**
+   * Create a BytesRefWritable referenced to one section of the given bytes. The
+   * section is determined by argument <tt>offset</tt> and <tt>len</tt>.
+   */
+  public BytesRefWritable(byte[] data, int offset, int len) {
+    bytes = data;
+    start = offset;
+    length = len;
+  }
+
+  /**
+   * Create a BytesRefWritable referenced to one section of the given bytes. The
+   * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback
+   * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to
+   * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and
+   * <tt>len</tt> after uncompressing the data.
+   */
+  public BytesRefWritable(LazyDecompressionCallback lazyDecompressData,
+                          int offset, int len) {
+    lazyDecompressObj = lazyDecompressData;
+    start = offset;
+    length = len;
+  }
+
+  private void lazyDecompress() throws IOException {
+    if (bytes == null && lazyDecompressObj != null) {
+      bytes = lazyDecompressObj.decompress();
+    }
+  }
+
+  /**
+   * Returns a copy of the underlying bytes referenced by this instance.
+   *
+   * @return a new copied byte array
+   * @throws java.io.IOException
+   */
+  public byte[] getBytesCopy() throws IOException {
+    lazyDecompress();
+    byte[] bb = new byte[length];
+    System.arraycopy(bytes, start, bb, 0, length);
+    return bb;
+  }
+
+  /**
+   * Returns the underlying bytes.
+   *
+   * @throws java.io.IOException
+   */
+  public byte[] getData() throws IOException {
+    lazyDecompress();
+    return bytes;
+  }
+
+  /**
+   * readFields() will corrupt the array. So use the set method whenever
+   * possible.
+   *
+   * @see #readFields(java.io.DataInput)
+   */
+  public void set(byte[] newData, int offset, int len) {
+    bytes = newData;
+    start = offset;
+    length = len;
+    lazyDecompressObj = null;
+  }
+
+  /**
+   * readFields() will corrupt the array. So use the set method whenever
+   * possible.
+   *
+   * @see #readFields(java.io.DataInput)
+   */
+  public void set(LazyDecompressionCallback newData, int offset, int len) {
+    bytes = null;
+    start = offset;
+    length = len;
+    lazyDecompressObj = newData;
+  }
+
+  public void writeDataTo(DataOutput out) throws IOException {
+    lazyDecompress();
+    out.write(bytes, start, length);
+  }
+
+  /**
+   * Always reuse the bytes array if length of bytes array is equal or greater
+   * to the current record, otherwise create a new one. readFields will corrupt
+   * the array. Please use set() whenever possible.
+   *
+   * @see #set(byte[], int, int)
+   */
+  public void readFields(DataInput in) throws IOException {
+    int len = in.readInt();
+    if (len > bytes.length) {
+      bytes = new byte[len];
+    }
+    start = 0;
+    length = len;
+    in.readFully(bytes, start, length);
+  }
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    lazyDecompress();
+    out.writeInt(length);
+    out.write(bytes, start, length);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(3 * length);
+    for (int idx = start; idx < length; idx++) {
+      // if not the first, put a blank separator in
+      if (idx != 0) {
+        sb.append(' ');
+      }
+      String num = Integer.toHexString(0xff & bytes[idx]);
+      // if it is only one digit, add a leading 0.
+      if (num.length() < 2) {
+        sb.append('0');
+      }
+      sb.append(num);
+    }
+    return sb.toString();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int compareTo(BytesRefWritable other) {
+    if (other == null) {
+      throw new IllegalArgumentException("Argument can not be null.");
+    }
+    if (this == other) {
+      return 0;
+    }
+    try {
+      return WritableComparator.compareBytes(getData(), start, getLength(),
+          other.getData(), other.start, other.getLength());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object right_obj) {
+    if (right_obj == null || !(right_obj instanceof BytesRefWritable)) {
+      return false;
+    }
+    return compareTo((BytesRefWritable) right_obj) == 0;
+  }
+
+  static {
+    WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() {
+
+      @Override
+      public Writable newInstance() {
+        return new BytesRefWritable();
+      }
+
+    });
+  }
+
+  public int getLength() {
+    return length;
+  }
+
+  public int getStart() {
+    return start;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
new file mode 100644
index 0000000..352776f
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.ArrayList;
+
+/**
+ * ColumnProjectionUtils.
+ *
+ */
+public final class ColumnProjectionUtils {
+
+  public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
+
+  /**
+   * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
+   * is included in the list, RCFile's reader will not skip its value.
+   *
+   */
+  public static void setReadColumnIDs(Configuration conf, ArrayList<Integer> ids) {
+    String id = toReadColumnIDString(ids);
+    setReadColumnIDConf(conf, id);
+  }
+
+  /**
+   * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
+   * is included in the list, RCFile's reader will not skip its value.
+   *
+   */
+  public static void appendReadColumnIDs(Configuration conf,
+                                         ArrayList<Integer> ids) {
+    String id = toReadColumnIDString(ids);
+    if (id != null) {
+      String old = conf.get(READ_COLUMN_IDS_CONF_STR, null);
+      String newConfStr = id;
+      if (old != null) {
+        newConfStr = newConfStr + StringUtils.COMMA_STR + old;
+      }
+
+      setReadColumnIDConf(conf, newConfStr);
+    }
+  }
+
+  private static void setReadColumnIDConf(Configuration conf, String id) {
+    if (id == null || id.length() <= 0) {
+      conf.set(READ_COLUMN_IDS_CONF_STR, "");
+      return;
+    }
+
+    conf.set(READ_COLUMN_IDS_CONF_STR, id);
+  }
+
+  private static String toReadColumnIDString(ArrayList<Integer> ids) {
+    String id = null;
+    if (ids != null) {
+      for (int i = 0; i < ids.size(); i++) {
+        if (i == 0) {
+          id = "" + ids.get(i);
+        } else {
+          id = id + StringUtils.COMMA_STR + ids.get(i);
+        }
+      }
+    }
+    return id;
+  }
+
+  /**
+   * Returns an array of column ids(start from zero) which is set in the given
+   * parameter <tt>conf</tt>.
+   */
+  public static ArrayList<Integer> getReadColumnIDs(Configuration conf) {
+    if (conf == null) {
+      return new ArrayList<Integer>(0);
+    }
+    String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
+    String[] list = StringUtils.split(skips);
+    ArrayList<Integer> result = new ArrayList<Integer>(list.length);
+    for (String element : list) {
+      // it may contain duplicates, remove duplicates
+      Integer toAdd = Integer.parseInt(element);
+      if (!result.contains(toAdd)) {
+        result.add(toAdd);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Clears the read column ids set in the conf, and will read all columns.
+   */
+  public static void setFullyReadColumns(Configuration conf) {
+    conf.set(READ_COLUMN_IDS_CONF_STR, "");
+  }
+
+  private ColumnProjectionUtils() {
+    // prevent instantiation
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
new file mode 100644
index 0000000..eab2356
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.IOException;
+
+/**
+ * Used to call back lazy decompression process.
+ *
+ * @see org.apache.tajo.storage.rcfile.BytesRefWritable
+ */
+public interface LazyDecompressionCallback {
+
+  byte[] decompress() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
new file mode 100644
index 0000000..bb6af22
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayInputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayInputStream extends ByteArrayInputStream {
+  public NonSyncByteArrayInputStream() {
+    super(new byte[] {});
+  }
+
+  public NonSyncByteArrayInputStream(byte[] bs) {
+    super(bs);
+  }
+
+  public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) {
+    super(buf, offset, length);
+  }
+
+  public void reset(byte[] input, int start, int length) {
+    buf = input;
+    count = start + length;
+    mark = start;
+    pos = start;
+  }
+
+  public int getPosition() {
+    return pos;
+  }
+
+  public int getLength() {
+    return count;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int read() {
+    return (pos < count) ? (buf[pos++] & 0xff) : -1;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int read(byte b[], int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    } else if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (pos >= count) {
+      return -1;
+    }
+    if (pos + len > count) {
+      len = count - pos;
+    }
+    if (len <= 0) {
+      return 0;
+    }
+    System.arraycopy(buf, pos, b, off, len);
+    pos += len;
+    return len;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long skip(long n) {
+    if (pos + n > count) {
+      n = count - pos;
+    }
+    if (n < 0) {
+      return 0;
+    }
+    pos += n;
+    return n;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int available() {
+    return count - pos;
+  }
+
+  public void seek(int pos) {
+    this.pos = pos;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
new file mode 100644
index 0000000..53a3dca
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream {
+  public NonSyncByteArrayOutputStream(int size) {
+    super(size);
+  }
+
+  public NonSyncByteArrayOutputStream() {
+    super(64 * 1024);
+  }
+
+  public byte[] getData() {
+    return buf;
+  }
+
+  public int getLength() {
+    return count;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void reset() {
+    count = 0;
+  }
+
+  public void write(DataInput in, int length) throws IOException {
+    enLargeBuffer(length);
+    in.readFully(buf, count, length);
+    count += length;
+  }
+
+  private byte[] vLongBytes = new byte[9];
+
+  public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
+    if (l >= -112 && l <= 127) {
+      bytes[offset] = (byte) l;
+      return 1;
+    }
+
+    int len = -112;
+    if (l < 0) {
+      l ^= -1L; // take one's complement'
+      len = -120;
+    }
+
+    long tmp = l;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    bytes[offset++] = (byte) len;
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
+    }
+    return 1 + len;
+  }
+
+  public int writeVLong(long l) {
+    int len = writeVLongToByteArray(vLongBytes, 0, l);
+    write(vLongBytes, 0, len);
+    return len;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(int b) {
+    enLargeBuffer(1);
+    buf[count] = (byte) b;
+    count += 1;
+  }
+
+  private int enLargeBuffer(int increment) {
+    int temp = count + increment;
+    int newLen = temp;
+    if (temp > buf.length) {
+      if ((buf.length << 1) > temp) {
+        newLen = buf.length << 1;
+      }
+      byte newbuf[] = new byte[newLen];
+      System.arraycopy(buf, 0, newbuf, 0, count);
+      buf = newbuf;
+    }
+    return newLen;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(byte b[], int off, int len) {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    enLargeBuffer(len);
+    System.arraycopy(b, off, buf, count, len);
+    count += len;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    out.write(buf, 0, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
new file mode 100644
index 0000000..46745ab
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.*;
+
+/**
+ * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataInputBuffer extends FilterInputStream implements
+    DataInput, Seekable {
+
+  private final NonSyncByteArrayInputStream buffer;
+
+  byte[] buff = new byte[16];
+
+  /** Constructs a new empty buffer. */
+  public NonSyncDataInputBuffer() {
+    this(new NonSyncByteArrayInputStream());
+  }
+
+  private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int length) {
+    buffer.reset(input, 0, length);
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int start, int length) {
+    buffer.reset(input, start, length);
+  }
+
+  /** Returns the current position in the input. */
+  public int getPosition() {
+    return buffer.getPosition();
+  }
+
+  /** Returns the length of the input. */
+  public int getLength() {
+    return buffer.getLength();
+  }
+
+  /**
+   * Reads bytes from the source stream into the byte array <code>buffer</code>.
+   * The number of bytes actually read is returned.
+   *
+   * @param buffer
+   *          the buffer to read bytes into
+   * @return the number of bytes actually read or -1 if end of stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  @Override
+  public final int read(byte[] buffer) throws IOException {
+    return in.read(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Read at most <code>length</code> bytes from this DataInputStream and stores
+   * them in byte array <code>buffer</code> starting at <code>offset</code>.
+   * Answer the number of bytes actually read or -1 if no bytes were read and
+   * end of stream was encountered.
+   *
+   * @param buffer
+   *          the byte array in which to store the read bytes.
+   * @param offset
+   *          the offset in <code>buffer</code> to store the read bytes.
+   * @param length
+   *          the maximum number of bytes to store in <code>buffer</code>.
+   * @return the number of bytes actually read or -1 if end of stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  @Deprecated
+  @Override
+  public final int read(byte[] buffer, int offset, int length)
+      throws IOException {
+    return in.read(buffer, offset, length);
+  }
+
+  /**
+   * Reads a boolean from this stream.
+   *
+   * @return the next boolean value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final boolean readBoolean() throws IOException {
+    int temp = in.read();
+    if (temp < 0) {
+      throw new EOFException();
+    }
+    return temp != 0;
+  }
+
+  /**
+   * Reads an 8-bit byte value from this stream.
+   *
+   * @return the next byte value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final byte readByte() throws IOException {
+    int temp = in.read();
+    if (temp < 0) {
+      throw new EOFException();
+    }
+    return (byte) temp;
+  }
+
+  /**
+   * Reads a 16-bit character value from this stream.
+   *
+   * @return the next <code>char</code> value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  private int readToBuff(int count) throws IOException {
+    int offset = 0;
+
+    while (offset < count) {
+      int bytesRead = in.read(buff, offset, count - offset);
+      if (bytesRead == -1) {
+        return bytesRead;
+      }
+      offset += bytesRead;
+    }
+    return offset;
+  }
+
+  public final char readChar() throws IOException {
+    if (readToBuff(2) < 0) {
+      throw new EOFException();
+    }
+    return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+
+  }
+
+  /**
+   * Reads a 64-bit <code>double</code> value from this stream.
+   *
+   * @return the next <code>double</code> value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final double readDouble() throws IOException {
+    return Double.longBitsToDouble(readLong());
+  }
+
+  /**
+   * Reads a 32-bit <code>float</code> value from this stream.
+   *
+   * @return the next <code>float</code> value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final float readFloat() throws IOException {
+    return Float.intBitsToFloat(readInt());
+  }
+
+  /**
+   * Reads bytes from this stream into the byte array <code>buffer</code>. This
+   * method will block until <code>buffer.length</code> number of bytes have
+   * been read.
+   *
+   * @param buffer
+   *          to read bytes into
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final void readFully(byte[] buffer) throws IOException {
+    readFully(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Reads bytes from this stream and stores them in the byte array
+   * <code>buffer</code> starting at the position <code>offset</code>. This
+   * method blocks until <code>count</code> bytes have been read.
+   *
+   * @param buffer
+   *          the byte array into which the data is read
+   * @param offset
+   *          the offset the operation start at
+   * @param length
+   *          the maximum number of bytes to read
+   *
+   * @throws java.io.IOException
+   *           if a problem occurs while reading from this stream
+   * @throws java.io.EOFException
+   *           if reaches the end of the stream before enough bytes have been
+   *           read
+   */
+  public final void readFully(byte[] buffer, int offset, int length)
+      throws IOException {
+    if (length < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (length == 0) {
+      return;
+    }
+    if (in == null || buffer == null) {
+      throw new NullPointerException("Null Pointer to underlying input stream");
+    }
+
+    if (offset < 0 || offset > buffer.length - length) {
+      throw new IndexOutOfBoundsException();
+    }
+    while (length > 0) {
+      int result = in.read(buffer, offset, length);
+      if (result < 0) {
+        throw new EOFException();
+      }
+      offset += result;
+      length -= result;
+    }
+  }
+
+  /**
+   * Reads a 32-bit integer value from this stream.
+   *
+   * @return the next <code>int</code> value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final int readInt() throws IOException {
+    if (readToBuff(4) < 0) {
+      throw new EOFException();
+    }
+    return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+        | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+  }
+
+  /**
+   * Answers a <code>String</code> representing the next line of text available
+   * in this BufferedReader. A line is represented by 0 or more characters
+   * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code> or
+   * end of stream. The <code>String</code> does not include the newline
+   * sequence.
+   *
+   * @return the contents of the line or null if no characters were read before
+   *         end of stream.
+   *
+   * @throws java.io.IOException
+   *           If the DataInputStream is already closed or some other IO error
+   *           occurs.
+   *
+   * @deprecated Use BufferedReader
+   */
+  @Deprecated
+  public final String readLine() throws IOException {
+    StringBuilder line = new StringBuilder(80); // Typical line length
+    boolean foundTerminator = false;
+    while (true) {
+      int nextByte = in.read();
+      switch (nextByte) {
+        case -1:
+          if (line.length() == 0 && !foundTerminator) {
+            return null;
+          }
+          return line.toString();
+        case (byte) '\r':
+          if (foundTerminator) {
+            ((PushbackInputStream) in).unread(nextByte);
+            return line.toString();
+          }
+          foundTerminator = true;
+        /* Have to be able to peek ahead one byte */
+          if (!(in.getClass() == PushbackInputStream.class)) {
+            in = new PushbackInputStream(in);
+          }
+          break;
+        case (byte) '\n':
+          return line.toString();
+        default:
+          if (foundTerminator) {
+            ((PushbackInputStream) in).unread(nextByte);
+            return line.toString();
+          }
+          line.append((char) nextByte);
+      }
+    }
+  }
+
+  /**
+   * Reads a 64-bit <code>long</code> value from this stream.
+   *
+   * @return the next <code>long</code> value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final long readLong() throws IOException {
+    if (readToBuff(8) < 0) {
+      throw new EOFException();
+    }
+    int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+        | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+    int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16)
+        | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff);
+
+    return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
+  }
+
+  /**
+   * Reads a 16-bit <code>short</code> value from this stream.
+   *
+   * @return the next <code>short</code> value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final short readShort() throws IOException {
+    if (readToBuff(2) < 0) {
+      throw new EOFException();
+    }
+    return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+  }
+
+  /**
+   * Reads an unsigned 8-bit <code>byte</code> value from this stream and
+   * returns it as an int.
+   *
+   * @return the next unsigned byte value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final int readUnsignedByte() throws IOException {
+    int temp = in.read();
+    if (temp < 0) {
+      throw new EOFException();
+    }
+    return temp;
+  }
+
+  /**
+   * Reads a 16-bit unsigned <code>short</code> value from this stream and
+   * returns it as an int.
+   *
+   * @return the next unsigned <code>short</code> value from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final int readUnsignedShort() throws IOException {
+    if (readToBuff(2) < 0) {
+      throw new EOFException();
+    }
+    return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+  }
+
+  /**
+   * Reads a UTF format String from this Stream.
+   *
+   * @return the next UTF String from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public final String readUTF() throws IOException {
+    return decodeUTF(readUnsignedShort());
+  }
+
+  String decodeUTF(int utfSize) throws IOException {
+    return decodeUTF(utfSize, this);
+  }
+
+  private static String decodeUTF(int utfSize, DataInput in) throws IOException {
+    byte[] buf = new byte[utfSize];
+    char[] out = new char[utfSize];
+    in.readFully(buf, 0, utfSize);
+
+    return convertUTF8WithBuf(buf, out, 0, utfSize);
+  }
+
+  /**
+   * Reads a UTF format String from the DataInput Stream <code>in</code>.
+   *
+   * @param in
+   *          the input stream to read from
+   * @return the next UTF String from the source stream.
+   *
+   * @throws java.io.IOException
+   *           If a problem occurs reading from this DataInputStream.
+   *
+   */
+  public static final String readUTF(DataInput in) throws IOException {
+    return decodeUTF(in.readUnsignedShort(), in);
+  }
+
+  /**
+   * Skips <code>count</code> number of bytes in this stream. Subsequent
+   * <code>read()</code>'s will not return these bytes unless
+   * <code>reset()</code> is used.
+   *
+   * @param count
+   *          the number of bytes to skip.
+   * @return the number of bytes actually skipped.
+   *
+   * @throws java.io.IOException
+   *           If the stream is already closed or another IOException occurs.
+   */
+  public final int skipBytes(int count) throws IOException {
+    int skipped = 0;
+    long skip;
+    while (skipped < count && (skip = in.skip(count - skipped)) != 0) {
+      skipped += skip;
+    }
+    if (skipped < 0) {
+      throw new EOFException();
+    }
+    return skipped;
+  }
+
+  public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset,
+                                          int utfSize) throws UTFDataFormatException {
+    int count = 0, s = 0, a;
+    while (count < utfSize) {
+      if ((out[s] = (char) buf[offset + count++]) < '\u0080') {
+        s++;
+      } else if (((a = out[s]) & 0xe0) == 0xc0) {
+        if (count >= utfSize) {
+          throw new UTFDataFormatException();
+        }
+        int b = buf[count++];
+        if ((b & 0xC0) != 0x80) {
+          throw new UTFDataFormatException();
+        }
+        out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
+      } else if ((a & 0xf0) == 0xe0) {
+        if (count + 1 >= utfSize) {
+          throw new UTFDataFormatException();
+        }
+        int b = buf[count++];
+        int c = buf[count++];
+        if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException();
+        }
+        out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F));
+      } else {
+        throw new UTFDataFormatException();
+      }
+    }
+    return new String(out, 0, s);
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    buffer.seek((int)pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return buffer.getPosition();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
new file mode 100644
index 0000000..3944f38
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataOutputBuffer extends DataOutputStream {
+
+  private final NonSyncByteArrayOutputStream buffer;
+
+  /** Constructs a new empty buffer. */
+  public NonSyncDataOutputBuffer() {
+    this(new NonSyncByteArrayOutputStream());
+  }
+
+  private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /**
+   * Returns the current contents of the buffer. Data is only valid to
+   * {@link #getLength()}.
+   */
+  public byte[] getData() {
+    return buffer.getData();
+  }
+
+  /** Returns the length of the valid data currently in the buffer. */
+  public int getLength() {
+    return buffer.getLength();
+  }
+
+  /** Resets the buffer to empty. */
+  public NonSyncDataOutputBuffer reset() {
+    written = 0;
+    buffer.reset();
+    return this;
+  }
+
+  /** Writes bytes from a DataInput directly into the buffer. */
+  public void write(DataInput in, int length) throws IOException {
+    buffer.write(in, length);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buffer.write(b);
+    incCount(1);
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    buffer.write(b, off, len);
+    incCount(len);
+  }
+
+  public void writeTo(DataOutputStream out) throws IOException {
+    buffer.writeTo(out);
+  }
+
+  private void incCount(int value) {
+    if (written + value < 0) {
+      written = Integer.MAX_VALUE;
+    } else {
+      written += value;
+    }
+  }
+}