You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:40 UTC
[06/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
new file mode 100644
index 0000000..5e200a0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable,
+ * and is able to resize without recreating new array if not necessary.
+ * <p>
+ *
+ * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field,
+ * which is the desired valid number of <tt>BytesRefWritable</tt> it holds.
+ * <tt>resetValid</tt> can reset the valid, but it will not care the underlying
+ * BytesRefWritable.
+ */
+
+public class BytesRefArrayWritable implements Writable,
+ Comparable<BytesRefArrayWritable> {
+
+ private BytesRefWritable[] bytesRefWritables = null;
+
+ private int valid = 0;
+
+ /**
+ * Constructs an empty array with the specified capacity.
+ *
+ * @param capacity
+ * initial capacity
+ * @exception IllegalArgumentException
+ * if the specified initial capacity is negative
+ */
+ public BytesRefArrayWritable(int capacity) {
+ if (capacity < 0) {
+ throw new IllegalArgumentException("Capacity can not be negative.");
+ }
+ bytesRefWritables = new BytesRefWritable[0];
+ ensureCapacity(capacity);
+ }
+
+ /**
+ * Constructs an empty array with a capacity of ten.
+ */
+ public BytesRefArrayWritable() {
+ this(10);
+ }
+
+ /**
+ * Returns the number of valid elements.
+ *
+ * @return the number of valid elements
+ */
+ public int size() {
+ return valid;
+ }
+
+ /**
+ * Gets the BytesRefWritable at the specified position. Make sure the position
+ * is valid by first call resetValid.
+ *
+ * @param index
+ * the position index, starting from zero
+ * @throws IndexOutOfBoundsException
+ */
+ public BytesRefWritable get(int index) {
+ if (index >= valid) {
+ throw new IndexOutOfBoundsException(
+ "This BytesRefArrayWritable only has " + valid + " valid values.");
+ }
+ return bytesRefWritables[index];
+ }
+
+ /**
+ * Gets the BytesRefWritable at the specified position without checking.
+ *
+ * @param index
+ * the position index, starting from zero
+ * @throws IndexOutOfBoundsException
+ */
+ public BytesRefWritable unCheckedGet(int index) {
+ return bytesRefWritables[index];
+ }
+
+ /**
+ * Set the BytesRefWritable at the specified position with the specified
+ * BytesRefWritable.
+ *
+ * @param index
+ * index position
+ * @param bytesRefWritable
+ * the new element
+ * @throws IllegalArgumentException
+ * if the specified new element is null
+ */
+ public void set(int index, BytesRefWritable bytesRefWritable) {
+ if (bytesRefWritable == null) {
+ throw new IllegalArgumentException("Can not assign null.");
+ }
+ ensureCapacity(index + 1);
+ bytesRefWritables[index] = bytesRefWritable;
+ if (valid <= index) {
+ valid = index + 1;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int compareTo(BytesRefArrayWritable other) {
+ if (other == null) {
+ throw new IllegalArgumentException("Argument can not be null.");
+ }
+ if (this == other) {
+ return 0;
+ }
+ int sizeDiff = valid - other.valid;
+ if (sizeDiff != 0) {
+ return sizeDiff;
+ }
+ for (int i = 0; i < valid; i++) {
+ if (other.contains(bytesRefWritables[i])) {
+ continue;
+ } else {
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hashCode(bytesRefWritables);
+ }
+ /**
+ * Returns <tt>true</tt> if this instance contains one or more the specified
+ * BytesRefWritable.
+ *
+ * @param bytesRefWritable
+ * BytesRefWritable element to be tested
+ * @return <tt>true</tt> if contains the specified element
+ * @throws IllegalArgumentException
+ * if the specified element is null
+ */
+ public boolean contains(BytesRefWritable bytesRefWritable) {
+ if (bytesRefWritable == null) {
+ throw new IllegalArgumentException("Argument can not be null.");
+ }
+ for (int i = 0; i < valid; i++) {
+ if (bytesRefWritables[i].equals(bytesRefWritable)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof BytesRefArrayWritable)) {
+ return false;
+ }
+ return compareTo((BytesRefArrayWritable) o) == 0;
+ }
+
+ /**
+ * Removes all elements.
+ */
+ public void clear() {
+ valid = 0;
+ }
+
+ /**
+ * enlarge the capacity if necessary, to ensure that it can hold the number of
+ * elements specified by newValidCapacity argument. It will also narrow the
+ * valid capacity when needed. Notice: it only enlarge or narrow the valid
+ * capacity with no care of the already stored invalid BytesRefWritable.
+ *
+ * @param newValidCapacity
+ * the desired capacity
+ */
+ public void resetValid(int newValidCapacity) {
+ ensureCapacity(newValidCapacity);
+ valid = newValidCapacity;
+ }
+
+ protected void ensureCapacity(int newCapacity) {
+ int size = bytesRefWritables.length;
+ if (size < newCapacity) {
+ bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity);
+ while (size < newCapacity) {
+ bytesRefWritables[size] = new BytesRefWritable();
+ size++;
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int count = in.readInt();
+ ensureCapacity(count);
+ for (int i = 0; i < count; i++) {
+ bytesRefWritables[i].readFields(in);
+ }
+ valid = count;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(valid);
+
+ for (int i = 0; i < valid; i++) {
+ BytesRefWritable cu = bytesRefWritables[i];
+ cu.write(out);
+ }
+ }
+
+ static {
+ WritableFactories.setFactory(BytesRefArrayWritable.class,
+ new WritableFactory() {
+
+ @Override
+ public Writable newInstance() {
+ return new BytesRefArrayWritable();
+ }
+
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
new file mode 100644
index 0000000..158c740
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used
+ * to avoid unnecessary byte copy.
+ */
+public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> {
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+ public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable();
+
+ int start = 0;
+ int length = 0;
+ byte[] bytes = null;
+
+ LazyDecompressionCallback lazyDecompressObj;
+
+ /**
+ * Create a zero-size bytes.
+ */
+ public BytesRefWritable() {
+ this(EMPTY_BYTES);
+ }
+
+ /**
+ * Create a BytesRefWritable with <tt>length</tt> bytes.
+ */
+ public BytesRefWritable(int length) {
+ assert length > 0;
+ this.length = length;
+ bytes = new byte[this.length];
+ start = 0;
+ }
+
+ /**
+ * Create a BytesRefWritable referenced to the given bytes.
+ */
+ public BytesRefWritable(byte[] bytes) {
+ this.bytes = bytes;
+ length = bytes.length;
+ start = 0;
+ }
+
+ /**
+ * Create a BytesRefWritable referenced to one section of the given bytes. The
+ * section is determined by argument <tt>offset</tt> and <tt>len</tt>.
+ */
+ public BytesRefWritable(byte[] data, int offset, int len) {
+ bytes = data;
+ start = offset;
+ length = len;
+ }
+
+ /**
+ * Create a BytesRefWritable referenced to one section of the given bytes. The
+ * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback
+ * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to
+ * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and
+ * <tt>len</tt> after uncompressing the data.
+ */
+ public BytesRefWritable(LazyDecompressionCallback lazyDecompressData,
+ int offset, int len) {
+ lazyDecompressObj = lazyDecompressData;
+ start = offset;
+ length = len;
+ }
+
+ private void lazyDecompress() throws IOException {
+ if (bytes == null && lazyDecompressObj != null) {
+ bytes = lazyDecompressObj.decompress();
+ }
+ }
+
+ /**
+ * Returns a copy of the underlying bytes referenced by this instance.
+ *
+ * @return a new copied byte array
+ * @throws java.io.IOException
+ */
+ public byte[] getBytesCopy() throws IOException {
+ lazyDecompress();
+ byte[] bb = new byte[length];
+ System.arraycopy(bytes, start, bb, 0, length);
+ return bb;
+ }
+
+ /**
+ * Returns the underlying bytes.
+ *
+ * @throws java.io.IOException
+ */
+ public byte[] getData() throws IOException {
+ lazyDecompress();
+ return bytes;
+ }
+
+ /**
+ * readFields() will corrupt the array. So use the set method whenever
+ * possible.
+ *
+ * @see #readFields(java.io.DataInput)
+ */
+ public void set(byte[] newData, int offset, int len) {
+ bytes = newData;
+ start = offset;
+ length = len;
+ lazyDecompressObj = null;
+ }
+
+ /**
+ * readFields() will corrupt the array. So use the set method whenever
+ * possible.
+ *
+ * @see #readFields(java.io.DataInput)
+ */
+ public void set(LazyDecompressionCallback newData, int offset, int len) {
+ bytes = null;
+ start = offset;
+ length = len;
+ lazyDecompressObj = newData;
+ }
+
+ public void writeDataTo(DataOutput out) throws IOException {
+ lazyDecompress();
+ out.write(bytes, start, length);
+ }
+
+ /**
+ * Always reuse the bytes array if length of bytes array is equal or greater
+ * to the current record, otherwise create a new one. readFields will corrupt
+ * the array. Please use set() whenever possible.
+ *
+ * @see #set(byte[], int, int)
+ */
+ public void readFields(DataInput in) throws IOException {
+ int len = in.readInt();
+ if (len > bytes.length) {
+ bytes = new byte[len];
+ }
+ start = 0;
+ length = len;
+ in.readFully(bytes, start, length);
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ lazyDecompress();
+ out.writeInt(length);
+ out.write(bytes, start, length);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(3 * length);
+ for (int idx = start; idx < length; idx++) {
+ // if not the first, put a blank separator in
+ if (idx != 0) {
+ sb.append(' ');
+ }
+ String num = Integer.toHexString(0xff & bytes[idx]);
+ // if it is only one digit, add a leading 0.
+ if (num.length() < 2) {
+ sb.append('0');
+ }
+ sb.append(num);
+ }
+ return sb.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int compareTo(BytesRefWritable other) {
+ if (other == null) {
+ throw new IllegalArgumentException("Argument can not be null.");
+ }
+ if (this == other) {
+ return 0;
+ }
+ try {
+ return WritableComparator.compareBytes(getData(), start, getLength(),
+ other.getData(), other.start, other.getLength());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object right_obj) {
+ if (right_obj == null || !(right_obj instanceof BytesRefWritable)) {
+ return false;
+ }
+ return compareTo((BytesRefWritable) right_obj) == 0;
+ }
+
+ static {
+ WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() {
+
+ @Override
+ public Writable newInstance() {
+ return new BytesRefWritable();
+ }
+
+ });
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public int getStart() {
+ return start;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
new file mode 100644
index 0000000..352776f
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.ArrayList;
+
+/**
+ * ColumnProjectionUtils.
+ *
+ */
+public final class ColumnProjectionUtils {
+
+ public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
+
+ /**
+ * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
+ * is included in the list, RCFile's reader will not skip its value.
+ *
+ */
+ public static void setReadColumnIDs(Configuration conf, ArrayList<Integer> ids) {
+ String id = toReadColumnIDString(ids);
+ setReadColumnIDConf(conf, id);
+ }
+
+ /**
+ * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
+ * is included in the list, RCFile's reader will not skip its value.
+ *
+ */
+ public static void appendReadColumnIDs(Configuration conf,
+ ArrayList<Integer> ids) {
+ String id = toReadColumnIDString(ids);
+ if (id != null) {
+ String old = conf.get(READ_COLUMN_IDS_CONF_STR, null);
+ String newConfStr = id;
+ if (old != null) {
+ newConfStr = newConfStr + StringUtils.COMMA_STR + old;
+ }
+
+ setReadColumnIDConf(conf, newConfStr);
+ }
+ }
+
+ private static void setReadColumnIDConf(Configuration conf, String id) {
+ if (id == null || id.length() <= 0) {
+ conf.set(READ_COLUMN_IDS_CONF_STR, "");
+ return;
+ }
+
+ conf.set(READ_COLUMN_IDS_CONF_STR, id);
+ }
+
+ private static String toReadColumnIDString(ArrayList<Integer> ids) {
+ String id = null;
+ if (ids != null) {
+ for (int i = 0; i < ids.size(); i++) {
+ if (i == 0) {
+ id = "" + ids.get(i);
+ } else {
+ id = id + StringUtils.COMMA_STR + ids.get(i);
+ }
+ }
+ }
+ return id;
+ }
+
+ /**
+ * Returns an array of column ids(start from zero) which is set in the given
+ * parameter <tt>conf</tt>.
+ */
+ public static ArrayList<Integer> getReadColumnIDs(Configuration conf) {
+ if (conf == null) {
+ return new ArrayList<Integer>(0);
+ }
+ String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
+ String[] list = StringUtils.split(skips);
+ ArrayList<Integer> result = new ArrayList<Integer>(list.length);
+ for (String element : list) {
+ // it may contain duplicates, remove duplicates
+ Integer toAdd = Integer.parseInt(element);
+ if (!result.contains(toAdd)) {
+ result.add(toAdd);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Clears the read column ids set in the conf, and will read all columns.
+ */
+ public static void setFullyReadColumns(Configuration conf) {
+ conf.set(READ_COLUMN_IDS_CONF_STR, "");
+ }
+
+ private ColumnProjectionUtils() {
+ // prevent instantiation
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
new file mode 100644
index 0000000..eab2356
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.IOException;
+
+/**
+ * Used to call back lazy decompression process.
+ *
+ * @see org.apache.tajo.storage.rcfile.BytesRefWritable
+ */
+public interface LazyDecompressionCallback {
+
+ byte[] decompress() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
new file mode 100644
index 0000000..bb6af22
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayInputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayInputStream extends ByteArrayInputStream {
+ public NonSyncByteArrayInputStream() {
+ super(new byte[] {});
+ }
+
+ public NonSyncByteArrayInputStream(byte[] bs) {
+ super(bs);
+ }
+
+ public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public void reset(byte[] input, int start, int length) {
+ buf = input;
+ count = start + length;
+ mark = start;
+ pos = start;
+ }
+
+ public int getPosition() {
+ return pos;
+ }
+
+ public int getLength() {
+ return count;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int read() {
+ return (pos < count) ? (buf[pos++] & 0xff) : -1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int read(byte b[], int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (pos >= count) {
+ return -1;
+ }
+ if (pos + len > count) {
+ len = count - pos;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long skip(long n) {
+ if (pos + n > count) {
+ n = count - pos;
+ }
+ if (n < 0) {
+ return 0;
+ }
+ pos += n;
+ return n;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int available() {
+ return count - pos;
+ }
+
+ public void seek(int pos) {
+ this.pos = pos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
new file mode 100644
index 0000000..53a3dca
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream {
+ public NonSyncByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ public NonSyncByteArrayOutputStream() {
+ super(64 * 1024);
+ }
+
+ public byte[] getData() {
+ return buf;
+ }
+
+ public int getLength() {
+ return count;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void reset() {
+ count = 0;
+ }
+
+ public void write(DataInput in, int length) throws IOException {
+ enLargeBuffer(length);
+ in.readFully(buf, count, length);
+ count += length;
+ }
+
+ private byte[] vLongBytes = new byte[9];
+
+ public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
+ if (l >= -112 && l <= 127) {
+ bytes[offset] = (byte) l;
+ return 1;
+ }
+
+ int len = -112;
+ if (l < 0) {
+ l ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = l;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ bytes[offset++] = (byte) len;
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
+ }
+ return 1 + len;
+ }
+
+ public int writeVLong(long l) {
+ int len = writeVLongToByteArray(vLongBytes, 0, l);
+ write(vLongBytes, 0, len);
+ return len;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(int b) {
+ enLargeBuffer(1);
+ buf[count] = (byte) b;
+ count += 1;
+ }
+
+ private int enLargeBuffer(int increment) {
+ int temp = count + increment;
+ int newLen = temp;
+ if (temp > buf.length) {
+ if ((buf.length << 1) > temp) {
+ newLen = buf.length << 1;
+ }
+ byte newbuf[] = new byte[newLen];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ return newLen;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(byte b[], int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+ enLargeBuffer(len);
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buf, 0, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
new file mode 100644
index 0000000..46745ab
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.*;
+
+/**
+ * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataInputBuffer extends FilterInputStream implements
+ DataInput, Seekable {
+
+ private final NonSyncByteArrayInputStream buffer;
+
+ byte[] buff = new byte[16];
+
+ /** Constructs a new empty buffer. */
+ public NonSyncDataInputBuffer() {
+ this(new NonSyncByteArrayInputStream());
+ }
+
+ private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int length) {
+ buffer.reset(input, 0, length);
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int start, int length) {
+ buffer.reset(input, start, length);
+ }
+
+ /** Returns the current position in the input. */
+ public int getPosition() {
+ return buffer.getPosition();
+ }
+
+ /** Returns the length of the input. */
+ public int getLength() {
+ return buffer.getLength();
+ }
+
+ /**
+ * Reads bytes from the source stream into the byte array <code>buffer</code>.
+ * The number of bytes actually read is returned.
+ *
+ * @param buffer
+ * the buffer to read bytes into
+ * @return the number of bytes actually read or -1 if end of stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Override
+ public final int read(byte[] buffer) throws IOException {
+ return in.read(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Read at most <code>length</code> bytes from this DataInputStream and stores
+ * them in byte array <code>buffer</code> starting at <code>offset</code>.
+ * Answer the number of bytes actually read or -1 if no bytes were read and
+ * end of stream was encountered.
+ *
+ * @param buffer
+ * the byte array in which to store the read bytes.
+ * @param offset
+ * the offset in <code>buffer</code> to store the read bytes.
+ * @param length
+ * the maximum number of bytes to store in <code>buffer</code>.
+ * @return the number of bytes actually read or -1 if end of stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ @Deprecated
+ @Override
+ public final int read(byte[] buffer, int offset, int length)
+ throws IOException {
+ return in.read(buffer, offset, length);
+ }
+
+ /**
+ * Reads a boolean from this stream.
+ *
+ * @return the next boolean value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final boolean readBoolean() throws IOException {
+ int temp = in.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return temp != 0;
+ }
+
+ /**
+ * Reads an 8-bit byte value from this stream.
+ *
+ * @return the next byte value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final byte readByte() throws IOException {
+ int temp = in.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return (byte) temp;
+ }
+
+ /**
+ * Reads a 16-bit character value from this stream.
+ *
+ * @return the next <code>char</code> value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ private int readToBuff(int count) throws IOException {
+ int offset = 0;
+
+ while (offset < count) {
+ int bytesRead = in.read(buff, offset, count - offset);
+ if (bytesRead == -1) {
+ return bytesRead;
+ }
+ offset += bytesRead;
+ }
+ return offset;
+ }
+
+ public final char readChar() throws IOException {
+ if (readToBuff(2) < 0) {
+ throw new EOFException();
+ }
+ return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+
+ }
+
+ /**
+ * Reads a 64-bit <code>double</code> value from this stream.
+ *
+ * @return the next <code>double</code> value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /**
+ * Reads a 32-bit <code>float</code> value from this stream.
+ *
+ * @return the next <code>float</code> value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /**
+ * Reads bytes from this stream into the byte array <code>buffer</code>. This
+ * method will block until <code>buffer.length</code> number of bytes have
+ * been read.
+ *
+ * @param buffer
+ * to read bytes into
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final void readFully(byte[] buffer) throws IOException {
+ readFully(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Reads bytes from this stream and stores them in the byte array
+ * <code>buffer</code> starting at the position <code>offset</code>. This
+ * method blocks until <code>count</code> bytes have been read.
+ *
+ * @param buffer
+ * the byte array into which the data is read
+ * @param offset
+ * the offset the operation start at
+ * @param length
+ * the maximum number of bytes to read
+ *
+ * @throws java.io.IOException
+ * if a problem occurs while reading from this stream
+ * @throws java.io.EOFException
+ * if reaches the end of the stream before enough bytes have been
+ * read
+ */
+ public final void readFully(byte[] buffer, int offset, int length)
+ throws IOException {
+ if (length < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (length == 0) {
+ return;
+ }
+ if (in == null || buffer == null) {
+ throw new NullPointerException("Null Pointer to underlying input stream");
+ }
+
+ if (offset < 0 || offset > buffer.length - length) {
+ throw new IndexOutOfBoundsException();
+ }
+ while (length > 0) {
+ int result = in.read(buffer, offset, length);
+ if (result < 0) {
+ throw new EOFException();
+ }
+ offset += result;
+ length -= result;
+ }
+ }
+
+ /**
+ * Reads a 32-bit integer value from this stream.
+ *
+ * @return the next <code>int</code> value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final int readInt() throws IOException {
+ if (readToBuff(4) < 0) {
+ throw new EOFException();
+ }
+ return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+ | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+ }
+
+ /**
+ * Answers a <code>String</code> representing the next line of text available
+ * in this BufferedReader. A line is represented by 0 or more characters
+ * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code> or
+ * end of stream. The <code>String</code> does not include the newline
+ * sequence.
+ *
+ * @return the contents of the line or null if no characters were read before
+ * end of stream.
+ *
+ * @throws java.io.IOException
+ * If the DataInputStream is already closed or some other IO error
+ * occurs.
+ *
+ * @deprecated Use BufferedReader
+ */
+ @Deprecated
+ public final String readLine() throws IOException {
+ StringBuilder line = new StringBuilder(80); // Typical line length
+ boolean foundTerminator = false;
+ while (true) {
+ int nextByte = in.read();
+ switch (nextByte) {
+ case -1:
+ if (line.length() == 0 && !foundTerminator) {
+ return null;
+ }
+ return line.toString();
+ case (byte) '\r':
+ if (foundTerminator) {
+ ((PushbackInputStream) in).unread(nextByte);
+ return line.toString();
+ }
+ foundTerminator = true;
+ /* Have to be able to peek ahead one byte */
+ if (!(in.getClass() == PushbackInputStream.class)) {
+ in = new PushbackInputStream(in);
+ }
+ break;
+ case (byte) '\n':
+ return line.toString();
+ default:
+ if (foundTerminator) {
+ ((PushbackInputStream) in).unread(nextByte);
+ return line.toString();
+ }
+ line.append((char) nextByte);
+ }
+ }
+ }
+
+ /**
+ * Reads a 64-bit <code>long</code> value from this stream.
+ *
+ * @return the next <code>long</code> value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final long readLong() throws IOException {
+ if (readToBuff(8) < 0) {
+ throw new EOFException();
+ }
+ int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+ | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+ int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16)
+ | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff);
+
+ return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
+ }
+
+ /**
+ * Reads a 16-bit <code>short</code> value from this stream.
+ *
+ * @return the next <code>short</code> value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final short readShort() throws IOException {
+ if (readToBuff(2) < 0) {
+ throw new EOFException();
+ }
+ return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+ }
+
+ /**
+ * Reads an unsigned 8-bit <code>byte</code> value from this stream and
+ * returns it as an int.
+ *
+ * @return the next unsigned byte value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final int readUnsignedByte() throws IOException {
+ int temp = in.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return temp;
+ }
+
+ /**
+ * Reads a 16-bit unsigned <code>short</code> value from this stream and
+ * returns it as an int.
+ *
+ * @return the next unsigned <code>short</code> value from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final int readUnsignedShort() throws IOException {
+ if (readToBuff(2) < 0) {
+ throw new EOFException();
+ }
+ return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+ }
+
+ /**
+ * Reads a UTF format String from this Stream.
+ *
+ * @return the next UTF String from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public final String readUTF() throws IOException {
+ return decodeUTF(readUnsignedShort());
+ }
+
+ String decodeUTF(int utfSize) throws IOException {
+ return decodeUTF(utfSize, this);
+ }
+
+ private static String decodeUTF(int utfSize, DataInput in) throws IOException {
+ byte[] buf = new byte[utfSize];
+ char[] out = new char[utfSize];
+ in.readFully(buf, 0, utfSize);
+
+ return convertUTF8WithBuf(buf, out, 0, utfSize);
+ }
+
+ /**
+ * Reads a UTF format String from the DataInput Stream <code>in</code>.
+ *
+ * @param in
+ * the input stream to read from
+ * @return the next UTF String from the source stream.
+ *
+ * @throws java.io.IOException
+ * If a problem occurs reading from this DataInputStream.
+ *
+ */
+ public static final String readUTF(DataInput in) throws IOException {
+ return decodeUTF(in.readUnsignedShort(), in);
+ }
+
+ /**
+ * Skips <code>count</code> number of bytes in this stream. Subsequent
+ * <code>read()</code>'s will not return these bytes unless
+ * <code>reset()</code> is used.
+ *
+ * @param count
+ * the number of bytes to skip.
+ * @return the number of bytes actually skipped.
+ *
+ * @throws java.io.IOException
+ * If the stream is already closed or another IOException occurs.
+ */
+ public final int skipBytes(int count) throws IOException {
+ int skipped = 0;
+ long skip;
+ while (skipped < count && (skip = in.skip(count - skipped)) != 0) {
+ skipped += skip;
+ }
+ if (skipped < 0) {
+ throw new EOFException();
+ }
+ return skipped;
+ }
+
+ public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset,
+ int utfSize) throws UTFDataFormatException {
+ int count = 0, s = 0, a;
+ while (count < utfSize) {
+ if ((out[s] = (char) buf[offset + count++]) < '\u0080') {
+ s++;
+ } else if (((a = out[s]) & 0xe0) == 0xc0) {
+ if (count >= utfSize) {
+ throw new UTFDataFormatException();
+ }
+ int b = buf[count++];
+ if ((b & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+ out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
+ } else if ((a & 0xf0) == 0xe0) {
+ if (count + 1 >= utfSize) {
+ throw new UTFDataFormatException();
+ }
+ int b = buf[count++];
+ int c = buf[count++];
+ if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException();
+ }
+ out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F));
+ } else {
+ throw new UTFDataFormatException();
+ }
+ }
+ return new String(out, 0, s);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ buffer.seek((int)pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return buffer.getPosition();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
new file mode 100644
index 0000000..3944f38
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataOutputBuffer extends DataOutputStream {
+
+ private final NonSyncByteArrayOutputStream buffer;
+
+ /** Constructs a new empty buffer. */
+ public NonSyncDataOutputBuffer() {
+ this(new NonSyncByteArrayOutputStream());
+ }
+
+ private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Returns the current contents of the buffer. Data is only valid to
+ * {@link #getLength()}.
+ */
+ public byte[] getData() {
+ return buffer.getData();
+ }
+
+ /** Returns the length of the valid data currently in the buffer. */
+ public int getLength() {
+ return buffer.getLength();
+ }
+
+ /** Resets the buffer to empty. */
+ public NonSyncDataOutputBuffer reset() {
+ written = 0;
+ buffer.reset();
+ return this;
+ }
+
+ /** Writes bytes from a DataInput directly into the buffer. */
+ public void write(DataInput in, int length) throws IOException {
+ buffer.write(in, length);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buffer.write(b);
+ incCount(1);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ buffer.write(b, off, len);
+ incCount(len);
+ }
+
+ public void writeTo(DataOutputStream out) throws IOException {
+ buffer.writeTo(out);
+ }
+
+ private void incCount(int value) {
+ if (written + value < 0) {
+ written = Integer.MAX_VALUE;
+ } else {
+ written += value;
+ }
+ }
+}