You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/05 10:01:46 UTC

svn commit: r801082 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/io/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/test/org/apache/hadoop/hive/ql/io/ serde/src/java/org/apache/hadoop/hive/serde2/

Author: zshao
Date: Wed Aug  5 08:01:46 2009
New Revision: 801082

URL: http://svn.apache.org/viewvc?rev=801082&view=rev
Log:
HIVE-720. Improve ByteStream by removing all synchronized method calls. (Yongqiang He via zshao)

Added:
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java   (contents, props changed)
      - copied, changed from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java   (contents, props changed)
      - copied, changed from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java
Removed:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Aug  5 08:01:46 2009
@@ -183,6 +183,9 @@
 
     HIVE-280. Reserve keywords such as database etc. (Namit Jain via zshao)
 
+    HIVE-720. Improve ByteStream by removing all synchronized method calls.
+    (Yongqiang He via zshao)
+
   OPTIMIZATIONS
 
     HIVE-279. Predicate Pushdown support (Prasad Chakka via athusoo).

Added: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java?rev=801082&view=auto
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java (added)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java Wed Aug  5 08:01:46 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayInputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayInputStream extends ByteArrayInputStream {
+  public NonSyncByteArrayInputStream() {
+    super(new byte[] {});
+  }
+
+  public NonSyncByteArrayInputStream(byte[] bs) {
+    super(bs);
+  }
+
+  public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) {
+    super(buf, offset, length);
+  }
+
+  public void reset(byte[] input, int start, int length) {
+    this.buf = input;
+    this.count = start + length;
+    this.mark = start;
+    this.pos = start;
+  }
+
+  public int getPosition() {
+    return pos;
+  }
+
+  public int getLength() {
+    return count;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public int read() {
+    return (pos < count) ? (buf[pos++] & 0xff) : -1;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public int read(byte b[], int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    } else if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (pos >= count) {
+      return -1;
+    }
+    if (pos + len > count) {
+      len = count - pos;
+    }
+    if (len <= 0) {
+      return 0;
+    }
+    System.arraycopy(buf, pos, b, off, len);
+    pos += len;
+    return len;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public long skip(long n) {
+    if (pos + n > count) {
+      n = count - pos;
+    }
+    if (n < 0) {
+      return 0;
+    }
+    pos += n;
+    return n;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public int available() {
+    return count - pos;
+  }
+}

Added: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java?rev=801082&view=auto
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java (added)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java Wed Aug  5 08:01:46 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream {
+  public NonSyncByteArrayOutputStream(int size) {
+    super(size);
+  }
+
+  public NonSyncByteArrayOutputStream() {
+    super();
+  }
+
+  public byte[] getData() {
+    return buf;
+  }
+
+  public int getLength() {
+    return count;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void reset() {
+    count = 0;
+  }
+
+  public void write(DataInput in, int length) throws IOException {
+    enLargeBuffer(length);
+    in.readFully(buf, count, length);
+    count += length;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void write(int b) {
+    enLargeBuffer(1);
+    buf[count] = (byte) b;
+    count += 1;
+  }
+
+  private int enLargeBuffer(int increment) {
+    int temp = count + increment;
+    int newLen = temp;
+    if (temp > buf.length) {
+      if ((buf.length << 1) > temp)
+        newLen = buf.length << 1;
+      byte newbuf[] = new byte[newLen];
+      System.arraycopy(buf, 0, newbuf, 0, count);
+      buf = newbuf;
+    }
+    return newLen;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void write(byte b[], int off, int len) {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    enLargeBuffer(len);
+    System.arraycopy(b, off, buf, count, len);
+    count += len;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void writeTo(OutputStream out) throws IOException {
+    out.write(buf, 0, count);
+  }
+}

Copied: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java (from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java)
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java?p2=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java&p1=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java&r1=801079&r2=801082&rev=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java Wed Aug  5 08:01:46 2009
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.EOFException;
 import java.io.FilterInputStream;
@@ -26,82 +25,24 @@
 import java.io.PushbackInputStream;
 import java.io.UTFDataFormatException;
 
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream;
+
 /**
  * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
  * synchronized modifiers.
  */
-public class HiveDataInputBuffer extends FilterInputStream implements DataInput {
-
-  private static class Buffer extends ByteArrayInputStream {
-    public Buffer() {
-      super(new byte[] {});
-    }
-
-    public void reset(byte[] input, int start, int length) {
-      this.buf = input;
-      this.count = start + length;
-      this.mark = start;
-      this.pos = start;
-    }
-
-    public int getPosition() {
-      return pos;
-    }
-
-    public int getLength() {
-      return count;
-    }
-
-    public int read() {
-      return (pos < count) ? (buf[pos++] & 0xff) : -1;
-    }
-
-    public int read(byte b[], int off, int len) {
-      if (b == null) {
-        throw new NullPointerException();
-      } else if (off < 0 || len < 0 || len > b.length - off) {
-        throw new IndexOutOfBoundsException();
-      }
-      if (pos >= count) {
-        return -1;
-      }
-      if (pos + len > count) {
-        len = count - pos;
-      }
-      if (len <= 0) {
-        return 0;
-      }
-      System.arraycopy(buf, pos, b, off, len);
-      pos += len;
-      return len;
-    }
-
-    public long skip(long n) {
-      if (pos + n > count) {
-        n = count - pos;
-      }
-      if (n < 0) {
-        return 0;
-      }
-      pos += n;
-      return n;
-    }
-
-    public int available() {
-      return count - pos;
-    }
-  }
+public class NonSyncDataInputBuffer extends FilterInputStream implements DataInput {
 
-  private Buffer buffer;
+  private NonSyncByteArrayInputStream buffer;
 
   byte[] buff;
 
   /** Constructs a new empty buffer. */
-  public HiveDataInputBuffer() {
-    this(new Buffer());
+  public NonSyncDataInputBuffer() {
+    this(new NonSyncByteArrayInputStream());
   }
 
-  private HiveDataInputBuffer(Buffer buffer) {
+  private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) {
     super(buffer);
     this.buffer = buffer;
   }

Propchange: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java (from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java)
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java?p2=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java&p1=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java&r1=801079&r2=801082&rev=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java Wed Aug  5 08:01:46 2009
@@ -17,81 +17,26 @@
  */
 package org.apache.hadoop.hive.ql.io;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream;
 
 /**
  * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
  * synchronized modifiers.
  */
-public class HiveDataOutputBuffer extends DataOutputStream {
-
-  private static class Buffer extends ByteArrayOutputStream {
-    public byte[] getData() {
-      return buf;
-    }
-
-    public int getLength() {
-      return count;
-    }
-
-    public void reset() {
-      count = 0;
-    }
-
-    public void write(DataInput in, int length) throws IOException {
-      enLargeBuffer(length);
-      in.readFully(buf, count, length);
-      count += length;
-    }
-
-    public void write(int b) {
-      enLargeBuffer(1);
-      buf[count] = (byte) b;
-      count += 1;
-    }
-
-    private int enLargeBuffer(int increment) {
-      int temp = count + increment;
-      int newLen = temp;
-      if (temp > buf.length) {
-        if ((buf.length << 1) > temp)
-          newLen = buf.length << 1;
-        byte newbuf[] = new byte[newLen];
-        System.arraycopy(buf, 0, newbuf, 0, count);
-        buf = newbuf;
-      }
-      return newLen;
-    }
-
-    public void write(byte b[], int off, int len) {
-      if ((off < 0) || (off > b.length) || (len < 0)
-          || ((off + len) > b.length) || ((off + len) < 0)) {
-        throw new IndexOutOfBoundsException();
-      } else if (len == 0) {
-        return;
-      }
-      enLargeBuffer(len);
-      System.arraycopy(b, off, buf, count, len);
-      count += len;
-    }
-
-    public void writeTo(OutputStream out) throws IOException {
-      out.write(buf, 0, count);
-    }
-  }
+public class NonSyncDataOutputBuffer extends DataOutputStream {
 
-  private Buffer buffer;
+  private NonSyncByteArrayOutputStream buffer;
 
   /** Constructs a new empty buffer. */
-  public HiveDataOutputBuffer() {
-    this(new Buffer());
+  public NonSyncDataOutputBuffer() {
+    this(new NonSyncByteArrayOutputStream());
   }
 
-  private HiveDataOutputBuffer(Buffer buffer) {
+  private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) {
     super(buffer);
     this.buffer = buffer;
   }
@@ -110,7 +55,7 @@
   }
 
   /** Resets the buffer to empty. */
-  public HiveDataOutputBuffer reset() {
+  public NonSyncDataOutputBuffer reset() {
     this.written = 0;
     buffer.reset();
     return this;

Propchange: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Aug  5 08:01:46 2009
@@ -194,7 +194,7 @@
     private int[] eachColumnValueLen = null;
     private int[] eachColumnUncompressedValueLen = null;
     // stores each cell's length of a column in one DataOutputBuffer element
-    private HiveDataOutputBuffer[] allCellValLenBuffer = null;
+    private NonSyncDataOutputBuffer[] allCellValLenBuffer = null;
     // how many rows in this split
     private int numberRows = 0;
     // how many columns
@@ -208,7 +208,7 @@
       columnNumber = columnNum;
       eachColumnValueLen = new int[columnNumber];
       eachColumnUncompressedValueLen = new int[columnNumber];
-      allCellValLenBuffer = new HiveDataOutputBuffer[columnNumber];
+      allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
       this.numberRows = numberRows;
     }
 
@@ -220,7 +220,7 @@
      * @param colValLenBuffer
      *          each cell's length of this column's in this split
      */
-    void setColumnLenInfo(int columnValueLen, HiveDataOutputBuffer colValLenBuffer,
+    void setColumnLenInfo(int columnValueLen, NonSyncDataOutputBuffer colValLenBuffer,
         int columnUncompressedValueLen, int columnIndex) {
       eachColumnValueLen[columnIndex] = columnValueLen;
       eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
@@ -231,7 +231,7 @@
     public void readFields(DataInput in) throws IOException {
       eachColumnValueLen = new int[columnNumber];
       eachColumnUncompressedValueLen = new int[columnNumber];
-      allCellValLenBuffer = new HiveDataOutputBuffer[columnNumber];
+      allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
 
       numberRows = WritableUtils.readVInt(in);
       for (int i = 0; i < columnNumber; i++) {
@@ -239,7 +239,7 @@
         eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
         int bufLen = WritableUtils.readVInt(in);
         if (allCellValLenBuffer[i] == null)
-          allCellValLenBuffer[i] = new HiveDataOutputBuffer();
+          allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
         else
           allCellValLenBuffer[i].reset();
         allCellValLenBuffer[i].write(in, bufLen);
@@ -253,7 +253,7 @@
       for (int i = 0; i < eachColumnValueLen.length; i++) {
         WritableUtils.writeVLong(out, eachColumnValueLen[i]);
         WritableUtils.writeVLong(out, eachColumnUncompressedValueLen[i]);
-        HiveDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
+        NonSyncDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
         int bufLen = colRowsLenBuf.getLength();
         WritableUtils.writeVLong(out, bufLen);
         out.write(colRowsLenBuf.getData(), 0, bufLen);
@@ -292,7 +292,7 @@
    */
   static class ValueBuffer implements Writable {
     // used to load columns' value into memory
-    private HiveDataOutputBuffer[] loadedColumnsValueBuffer = null;
+    private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
 
     boolean inited = false;
 
@@ -306,7 +306,7 @@
     CompressionCodec codec;
 
     Decompressor valDecompressor = null;
-    HiveDataInputBuffer decompressBuffer = new HiveDataInputBuffer();
+    NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
     CompressionInputStream deflatFilter = null;
 
     public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
@@ -338,7 +338,7 @@
           if (currentSkip)
             skipped++;
       }
-      loadedColumnsValueBuffer = new HiveDataOutputBuffer[columnNumber - skipped];
+      loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber - skipped];
       this.codec = codec;
       if (codec != null) {
         valDecompressor = CodecPool.getDecompressor(codec);
@@ -349,16 +349,16 @@
       for (int k = 0, readIndex = 0; k < columnNumber; k++) {
         if (skippedColIDs[k])
           continue;
-        loadedColumnsValueBuffer[readIndex] = new HiveDataOutputBuffer();
+        loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
         readIndex++;
       }
     }
 
-    public void setColumnValueBuffer(HiveDataOutputBuffer valBuffer, int addIndex) {
+    public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer, int addIndex) {
       loadedColumnsValueBuffer[addIndex] = valBuffer;
     }
 
-    HiveDataOutputBuffer compressedData = new HiveDataOutputBuffer();
+    NonSyncDataOutputBuffer compressedData = new NonSyncDataOutputBuffer();
 
     @Override
     public void readFields(DataInput in) throws IOException {
@@ -377,7 +377,7 @@
           skipTotal = 0;
         }
 
-        HiveDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
+        NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
         valBuf.reset();
         if (codec != null) {
           decompressBuffer.reset();
@@ -401,7 +401,7 @@
     @Override
     public void write(DataOutput out) throws IOException {
       for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
-        HiveDataOutputBuffer currentBuf = loadedColumnsValueBuffer[i];
+        NonSyncDataOutputBuffer currentBuf = loadedColumnsValueBuffer[i];
         out.write(currentBuf.getData(), 0, currentBuf.getLength());
       }
     }
@@ -461,12 +461,12 @@
     // how many records already buffered
     private int bufferedRecords = 0;
 
-    HiveDataOutputBuffer[] compressionBuffer;
+    NonSyncDataOutputBuffer[] compressionBuffer;
     CompressionOutputStream[] deflateFilter = null;
     DataOutputStream[] deflateOut = null;
     private ColumnBuffer[] columnBuffers;
 
-    HiveDataOutputBuffer keyCompressionBuffer;
+    NonSyncDataOutputBuffer keyCompressionBuffer;
     CompressionOutputStream keyDeflateFilter;
     DataOutputStream keyDeflateOut;
     Compressor keyCompressor;
@@ -483,13 +483,13 @@
      */
     class ColumnBuffer {
       // used for buffer a column's values
-      HiveDataOutputBuffer columnValBuffer;
+      NonSyncDataOutputBuffer columnValBuffer;
       // used to store each value's length
-      HiveDataOutputBuffer valLenBuffer;
+      NonSyncDataOutputBuffer valLenBuffer;
 
       ColumnBuffer() throws IOException {
-        columnValBuffer = new HiveDataOutputBuffer();
-        valLenBuffer = new HiveDataOutputBuffer();
+        columnValBuffer = new NonSyncDataOutputBuffer();
+        valLenBuffer = new NonSyncDataOutputBuffer();
       }
 
       public void append(BytesRefWritable data) throws IOException {
@@ -636,18 +636,18 @@
         ReflectionUtils.setConf(codec, this.conf);
         compressor = CodecPool.getCompressor(codec);
 
-        compressionBuffer = new HiveDataOutputBuffer[columnNumber];
+        compressionBuffer = new NonSyncDataOutputBuffer[columnNumber];
         deflateFilter = new CompressionOutputStream[columnNumber];
         deflateOut = new DataOutputStream[columnNumber];
         for (int i = 0; i < columnNumber; i++) {
-          compressionBuffer[i] = new HiveDataOutputBuffer();
+          compressionBuffer[i] = new NonSyncDataOutputBuffer();
           deflateFilter[i] = codec.createOutputStream(compressionBuffer[i],
               compressor);
           deflateOut[i] = new DataOutputStream(new BufferedOutputStream(
               deflateFilter[i]));
         }
         keyCompressor = CodecPool.getCompressor(codec);
-        keyCompressionBuffer = new HiveDataOutputBuffer();
+        keyCompressionBuffer = new NonSyncDataOutputBuffer();
         keyDeflateFilter = codec.createOutputStream(keyCompressionBuffer,
             keyCompressor);
         keyDeflateOut = new DataOutputStream(new BufferedOutputStream(
@@ -730,7 +730,7 @@
       for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
         ColumnBuffer currentBuf = columnBuffers[columnIndex];
 
-        HiveDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+        NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
 
         if (isCompressed()) {
           compressionBuffer[columnIndex].reset();
@@ -858,11 +858,11 @@
     private int passedRowsNum = 0;
 
     private int[] columnRowReadIndex = null;
-    private HiveDataInputBuffer[] colValLenBufferReadIn;
+    private NonSyncDataInputBuffer[] colValLenBufferReadIn;
     private boolean decompress = false;
 
     private Decompressor keyDecompressor;
-    HiveDataOutputBuffer keyDecompressedData = new HiveDataOutputBuffer();
+    NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
 
     /** Create a new RCFile reader. */
     public Reader(FileSystem fs, Path file, Configuration conf)
@@ -913,12 +913,12 @@
         }
       }
 
-      colValLenBufferReadIn = new HiveDataInputBuffer[columnNumber];
+      colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
       columnRowReadIndex = new int[columnNumber];
       for (int i = 0; i < columnNumber; i++) {
         columnRowReadIndex[i] = 0;
         if (!skippedColIDs[i])
-          colValLenBufferReadIn[i] = new HiveDataInputBuffer();
+          colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
       }
 
       currentKey = createKeyBuffer();
@@ -1090,9 +1090,9 @@
     }
 
     private int compressedKeyLen = 0;
-    HiveDataInputBuffer keyDataIn = new HiveDataInputBuffer();
-    HiveDataInputBuffer keyDecompressBuffer = new HiveDataInputBuffer();
-    HiveDataOutputBuffer keyTempBuffer = new HiveDataOutputBuffer();
+    NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
+    NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
+    NonSyncDataOutputBuffer keyTempBuffer = new NonSyncDataOutputBuffer();
 
     KeyBuffer currentKey = null;
     boolean keyInit = false;
@@ -1151,7 +1151,7 @@
 
     // use this buffer to hold column's cells value length for usages in
     // getColumn(), instead of using colValLenBufferReadIn directly.
-    private HiveDataInputBuffer fetchColumnTempBuf = new HiveDataInputBuffer();
+    private NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
 
     /**
      * Fetch all data in the buffer for a given column. This is useful for

Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java Wed Aug  5 08:01:46 2009
@@ -26,8 +26,8 @@
   public void testReadAndWrite() throws IOException {
     String testString = "test_hive_input_output_number_0";
     byte[] string_bytes = testString.getBytes();
-    HiveDataInputBuffer inBuffer = new HiveDataInputBuffer();
-    HiveDataOutputBuffer outBuffer = new HiveDataOutputBuffer();
+    NonSyncDataInputBuffer inBuffer = new NonSyncDataInputBuffer();
+    NonSyncDataOutputBuffer outBuffer = new NonSyncDataOutputBuffer();
     outBuffer.write(string_bytes);
     inBuffer.reset(outBuffer.getData(), 0, outBuffer.getLength());
     byte[] readBytes = new byte[string_bytes.length];

Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java Wed Aug  5 08:01:46 2009
@@ -18,14 +18,16 @@
 
 package org.apache.hadoop.hive.serde2;
 
-import java.io.*;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream;
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream;
 
 /**
  * Extensions to bytearrayinput/output streams
  *
  */
 public class ByteStream {
-  public static class Input extends ByteArrayInputStream {
+  public static class Input extends NonSyncByteArrayInputStream {
     public byte[] getData() { return buf; }
     public int getCount() { return count;}
     public void reset(byte [] argBuf, int argCount) {
@@ -43,7 +45,7 @@
     }
   }
     
-  public static class Output extends ByteArrayOutputStream {
+  public static class Output extends NonSyncByteArrayOutputStream {
     public byte[] getData() { return buf; }
     public int getCount() { return count;}