You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/05 10:01:46 UTC
svn commit: r801082 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/common/io/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/test/org/apache/hadoop/hive/ql/io/
serde/src/java/org/apache/hadoop/hive/serde2/
Author: zshao
Date: Wed Aug 5 08:01:46 2009
New Revision: 801082
URL: http://svn.apache.org/viewvc?rev=801082&view=rev
Log:
HIVE-720. Improve ByteStream by removing all synchronized method calls. (Yongqiang He via zshao)
Added:
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java (contents, props changed)
- copied, changed from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java (contents, props changed)
- copied, changed from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java
Removed:
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Aug 5 08:01:46 2009
@@ -183,6 +183,9 @@
HIVE-280. Reserve keywords such as database etc. (Namit Jain via zshao)
+ HIVE-720. Improve ByteStream by removing all synchronized method calls.
+ (Yongqiang He via zshao)
+
OPTIMIZATIONS
HIVE-279. Predicate Pushdown support (Prasad Chakka via athusoo).
Added: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java?rev=801082&view=auto
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java (added)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayInputStream.java Wed Aug 5 08:01:46 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayInputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayInputStream extends ByteArrayInputStream {
+ public NonSyncByteArrayInputStream() {
+ super(new byte[] {});
+ }
+
+ public NonSyncByteArrayInputStream(byte[] bs) {
+ super(bs);
+ }
+
+ public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public void reset(byte[] input, int start, int length) {
+ this.buf = input;
+ this.count = start + length;
+ this.mark = start;
+ this.pos = start;
+ }
+
+ public int getPosition() {
+ return pos;
+ }
+
+ public int getLength() {
+ return count;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int read() {
+ return (pos < count) ? (buf[pos++] & 0xff) : -1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int read(byte b[], int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (pos >= count) {
+ return -1;
+ }
+ if (pos + len > count) {
+ len = count - pos;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public long skip(long n) {
+ if (pos + n > count) {
+ n = count - pos;
+ }
+ if (n < 0) {
+ return 0;
+ }
+ pos += n;
+ return n;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int available() {
+ return count - pos;
+ }
+}
Added: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java?rev=801082&view=auto
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java (added)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java Wed Aug 5 08:01:46 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream {
+ public NonSyncByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ public NonSyncByteArrayOutputStream() {
+ super();
+ }
+
+ public byte[] getData() {
+ return buf;
+ }
+
+ public int getLength() {
+ return count;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void reset() {
+ count = 0;
+ }
+
+ public void write(DataInput in, int length) throws IOException {
+ enLargeBuffer(length);
+ in.readFully(buf, count, length);
+ count += length;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(int b) {
+ enLargeBuffer(1);
+ buf[count] = (byte) b;
+ count += 1;
+ }
+
+ private int enLargeBuffer(int increment) {
+ int temp = count + increment;
+ int newLen = temp;
+ if (temp > buf.length) {
+ if ((buf.length << 1) > temp)
+ newLen = buf.length << 1;
+ byte newbuf[] = new byte[newLen];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ return newLen;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(byte b[], int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+ enLargeBuffer(len);
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buf, 0, count);
+ }
+}
Copied: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java (from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java)
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java?p2=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java&p1=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java&r1=801079&r2=801082&rev=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataInputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java Wed Aug 5 08:01:46 2009
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.io;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.EOFException;
import java.io.FilterInputStream;
@@ -26,82 +25,24 @@
import java.io.PushbackInputStream;
import java.io.UTFDataFormatException;
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream;
+
/**
* A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
* synchronized modifiers.
*/
-public class HiveDataInputBuffer extends FilterInputStream implements DataInput {
-
- private static class Buffer extends ByteArrayInputStream {
- public Buffer() {
- super(new byte[] {});
- }
-
- public void reset(byte[] input, int start, int length) {
- this.buf = input;
- this.count = start + length;
- this.mark = start;
- this.pos = start;
- }
-
- public int getPosition() {
- return pos;
- }
-
- public int getLength() {
- return count;
- }
-
- public int read() {
- return (pos < count) ? (buf[pos++] & 0xff) : -1;
- }
-
- public int read(byte b[], int off, int len) {
- if (b == null) {
- throw new NullPointerException();
- } else if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
- if (pos >= count) {
- return -1;
- }
- if (pos + len > count) {
- len = count - pos;
- }
- if (len <= 0) {
- return 0;
- }
- System.arraycopy(buf, pos, b, off, len);
- pos += len;
- return len;
- }
-
- public long skip(long n) {
- if (pos + n > count) {
- n = count - pos;
- }
- if (n < 0) {
- return 0;
- }
- pos += n;
- return n;
- }
-
- public int available() {
- return count - pos;
- }
- }
+public class NonSyncDataInputBuffer extends FilterInputStream implements DataInput {
- private Buffer buffer;
+ private NonSyncByteArrayInputStream buffer;
byte[] buff;
/** Constructs a new empty buffer. */
- public HiveDataInputBuffer() {
- this(new Buffer());
+ public NonSyncDataInputBuffer() {
+ this(new NonSyncByteArrayInputStream());
}
- private HiveDataInputBuffer(Buffer buffer) {
+ private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) {
super(buffer);
this.buffer = buffer;
}
Propchange: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java
------------------------------------------------------------------------------
svn:mergeinfo =
Copied: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java (from r801079, hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java)
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java?p2=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java&p1=hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java&r1=801079&r2=801082&rev=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveDataOutputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java Wed Aug 5 08:01:46 2009
@@ -17,81 +17,26 @@
*/
package org.apache.hadoop.hive.ql.io;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream;
/**
* A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
* synchronized modifiers.
*/
-public class HiveDataOutputBuffer extends DataOutputStream {
-
- private static class Buffer extends ByteArrayOutputStream {
- public byte[] getData() {
- return buf;
- }
-
- public int getLength() {
- return count;
- }
-
- public void reset() {
- count = 0;
- }
-
- public void write(DataInput in, int length) throws IOException {
- enLargeBuffer(length);
- in.readFully(buf, count, length);
- count += length;
- }
-
- public void write(int b) {
- enLargeBuffer(1);
- buf[count] = (byte) b;
- count += 1;
- }
-
- private int enLargeBuffer(int increment) {
- int temp = count + increment;
- int newLen = temp;
- if (temp > buf.length) {
- if ((buf.length << 1) > temp)
- newLen = buf.length << 1;
- byte newbuf[] = new byte[newLen];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
- return newLen;
- }
-
- public void write(byte b[], int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0)
- || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return;
- }
- enLargeBuffer(len);
- System.arraycopy(b, off, buf, count, len);
- count += len;
- }
-
- public void writeTo(OutputStream out) throws IOException {
- out.write(buf, 0, count);
- }
- }
+public class NonSyncDataOutputBuffer extends DataOutputStream {
- private Buffer buffer;
+ private NonSyncByteArrayOutputStream buffer;
/** Constructs a new empty buffer. */
- public HiveDataOutputBuffer() {
- this(new Buffer());
+ public NonSyncDataOutputBuffer() {
+ this(new NonSyncByteArrayOutputStream());
}
- private HiveDataOutputBuffer(Buffer buffer) {
+ private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) {
super(buffer);
this.buffer = buffer;
}
@@ -110,7 +55,7 @@
}
/** Resets the buffer to empty. */
- public HiveDataOutputBuffer reset() {
+ public NonSyncDataOutputBuffer reset() {
this.written = 0;
buffer.reset();
return this;
Propchange: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Aug 5 08:01:46 2009
@@ -194,7 +194,7 @@
private int[] eachColumnValueLen = null;
private int[] eachColumnUncompressedValueLen = null;
// stores each cell's length of a column in one DataOutputBuffer element
- private HiveDataOutputBuffer[] allCellValLenBuffer = null;
+ private NonSyncDataOutputBuffer[] allCellValLenBuffer = null;
// how many rows in this split
private int numberRows = 0;
// how many columns
@@ -208,7 +208,7 @@
columnNumber = columnNum;
eachColumnValueLen = new int[columnNumber];
eachColumnUncompressedValueLen = new int[columnNumber];
- allCellValLenBuffer = new HiveDataOutputBuffer[columnNumber];
+ allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
this.numberRows = numberRows;
}
@@ -220,7 +220,7 @@
* @param colValLenBuffer
* each cell's length of this column's in this split
*/
- void setColumnLenInfo(int columnValueLen, HiveDataOutputBuffer colValLenBuffer,
+ void setColumnLenInfo(int columnValueLen, NonSyncDataOutputBuffer colValLenBuffer,
int columnUncompressedValueLen, int columnIndex) {
eachColumnValueLen[columnIndex] = columnValueLen;
eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
@@ -231,7 +231,7 @@
public void readFields(DataInput in) throws IOException {
eachColumnValueLen = new int[columnNumber];
eachColumnUncompressedValueLen = new int[columnNumber];
- allCellValLenBuffer = new HiveDataOutputBuffer[columnNumber];
+ allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
numberRows = WritableUtils.readVInt(in);
for (int i = 0; i < columnNumber; i++) {
@@ -239,7 +239,7 @@
eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
int bufLen = WritableUtils.readVInt(in);
if (allCellValLenBuffer[i] == null)
- allCellValLenBuffer[i] = new HiveDataOutputBuffer();
+ allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
else
allCellValLenBuffer[i].reset();
allCellValLenBuffer[i].write(in, bufLen);
@@ -253,7 +253,7 @@
for (int i = 0; i < eachColumnValueLen.length; i++) {
WritableUtils.writeVLong(out, eachColumnValueLen[i]);
WritableUtils.writeVLong(out, eachColumnUncompressedValueLen[i]);
- HiveDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
+ NonSyncDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
int bufLen = colRowsLenBuf.getLength();
WritableUtils.writeVLong(out, bufLen);
out.write(colRowsLenBuf.getData(), 0, bufLen);
@@ -292,7 +292,7 @@
*/
static class ValueBuffer implements Writable {
// used to load columns' value into memory
- private HiveDataOutputBuffer[] loadedColumnsValueBuffer = null;
+ private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
boolean inited = false;
@@ -306,7 +306,7 @@
CompressionCodec codec;
Decompressor valDecompressor = null;
- HiveDataInputBuffer decompressBuffer = new HiveDataInputBuffer();
+ NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
CompressionInputStream deflatFilter = null;
public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
@@ -338,7 +338,7 @@
if (currentSkip)
skipped++;
}
- loadedColumnsValueBuffer = new HiveDataOutputBuffer[columnNumber - skipped];
+ loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber - skipped];
this.codec = codec;
if (codec != null) {
valDecompressor = CodecPool.getDecompressor(codec);
@@ -349,16 +349,16 @@
for (int k = 0, readIndex = 0; k < columnNumber; k++) {
if (skippedColIDs[k])
continue;
- loadedColumnsValueBuffer[readIndex] = new HiveDataOutputBuffer();
+ loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
readIndex++;
}
}
- public void setColumnValueBuffer(HiveDataOutputBuffer valBuffer, int addIndex) {
+ public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer, int addIndex) {
loadedColumnsValueBuffer[addIndex] = valBuffer;
}
- HiveDataOutputBuffer compressedData = new HiveDataOutputBuffer();
+ NonSyncDataOutputBuffer compressedData = new NonSyncDataOutputBuffer();
@Override
public void readFields(DataInput in) throws IOException {
@@ -377,7 +377,7 @@
skipTotal = 0;
}
- HiveDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
+ NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
valBuf.reset();
if (codec != null) {
decompressBuffer.reset();
@@ -401,7 +401,7 @@
@Override
public void write(DataOutput out) throws IOException {
for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
- HiveDataOutputBuffer currentBuf = loadedColumnsValueBuffer[i];
+ NonSyncDataOutputBuffer currentBuf = loadedColumnsValueBuffer[i];
out.write(currentBuf.getData(), 0, currentBuf.getLength());
}
}
@@ -461,12 +461,12 @@
// how many records already buffered
private int bufferedRecords = 0;
- HiveDataOutputBuffer[] compressionBuffer;
+ NonSyncDataOutputBuffer[] compressionBuffer;
CompressionOutputStream[] deflateFilter = null;
DataOutputStream[] deflateOut = null;
private ColumnBuffer[] columnBuffers;
- HiveDataOutputBuffer keyCompressionBuffer;
+ NonSyncDataOutputBuffer keyCompressionBuffer;
CompressionOutputStream keyDeflateFilter;
DataOutputStream keyDeflateOut;
Compressor keyCompressor;
@@ -483,13 +483,13 @@
*/
class ColumnBuffer {
// used for buffer a column's values
- HiveDataOutputBuffer columnValBuffer;
+ NonSyncDataOutputBuffer columnValBuffer;
// used to store each value's length
- HiveDataOutputBuffer valLenBuffer;
+ NonSyncDataOutputBuffer valLenBuffer;
ColumnBuffer() throws IOException {
- columnValBuffer = new HiveDataOutputBuffer();
- valLenBuffer = new HiveDataOutputBuffer();
+ columnValBuffer = new NonSyncDataOutputBuffer();
+ valLenBuffer = new NonSyncDataOutputBuffer();
}
public void append(BytesRefWritable data) throws IOException {
@@ -636,18 +636,18 @@
ReflectionUtils.setConf(codec, this.conf);
compressor = CodecPool.getCompressor(codec);
- compressionBuffer = new HiveDataOutputBuffer[columnNumber];
+ compressionBuffer = new NonSyncDataOutputBuffer[columnNumber];
deflateFilter = new CompressionOutputStream[columnNumber];
deflateOut = new DataOutputStream[columnNumber];
for (int i = 0; i < columnNumber; i++) {
- compressionBuffer[i] = new HiveDataOutputBuffer();
+ compressionBuffer[i] = new NonSyncDataOutputBuffer();
deflateFilter[i] = codec.createOutputStream(compressionBuffer[i],
compressor);
deflateOut[i] = new DataOutputStream(new BufferedOutputStream(
deflateFilter[i]));
}
keyCompressor = CodecPool.getCompressor(codec);
- keyCompressionBuffer = new HiveDataOutputBuffer();
+ keyCompressionBuffer = new NonSyncDataOutputBuffer();
keyDeflateFilter = codec.createOutputStream(keyCompressionBuffer,
keyCompressor);
keyDeflateOut = new DataOutputStream(new BufferedOutputStream(
@@ -730,7 +730,7 @@
for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
ColumnBuffer currentBuf = columnBuffers[columnIndex];
- HiveDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+ NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
if (isCompressed()) {
compressionBuffer[columnIndex].reset();
@@ -858,11 +858,11 @@
private int passedRowsNum = 0;
private int[] columnRowReadIndex = null;
- private HiveDataInputBuffer[] colValLenBufferReadIn;
+ private NonSyncDataInputBuffer[] colValLenBufferReadIn;
private boolean decompress = false;
private Decompressor keyDecompressor;
- HiveDataOutputBuffer keyDecompressedData = new HiveDataOutputBuffer();
+ NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
/** Create a new RCFile reader. */
public Reader(FileSystem fs, Path file, Configuration conf)
@@ -913,12 +913,12 @@
}
}
- colValLenBufferReadIn = new HiveDataInputBuffer[columnNumber];
+ colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
columnRowReadIndex = new int[columnNumber];
for (int i = 0; i < columnNumber; i++) {
columnRowReadIndex[i] = 0;
if (!skippedColIDs[i])
- colValLenBufferReadIn[i] = new HiveDataInputBuffer();
+ colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
}
currentKey = createKeyBuffer();
@@ -1090,9 +1090,9 @@
}
private int compressedKeyLen = 0;
- HiveDataInputBuffer keyDataIn = new HiveDataInputBuffer();
- HiveDataInputBuffer keyDecompressBuffer = new HiveDataInputBuffer();
- HiveDataOutputBuffer keyTempBuffer = new HiveDataOutputBuffer();
+ NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
+ NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
+ NonSyncDataOutputBuffer keyTempBuffer = new NonSyncDataOutputBuffer();
KeyBuffer currentKey = null;
boolean keyInit = false;
@@ -1151,7 +1151,7 @@
// use this buffer to hold column's cells value length for usages in
// getColumn(), instead of using colValLenBufferReadIn directly.
- private HiveDataInputBuffer fetchColumnTempBuf = new HiveDataInputBuffer();
+ private NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
/**
* Fetch all data in the buffer for a given column. This is useful for
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveInputOutputBuffer.java Wed Aug 5 08:01:46 2009
@@ -26,8 +26,8 @@
public void testReadAndWrite() throws IOException {
String testString = "test_hive_input_output_number_0";
byte[] string_bytes = testString.getBytes();
- HiveDataInputBuffer inBuffer = new HiveDataInputBuffer();
- HiveDataOutputBuffer outBuffer = new HiveDataOutputBuffer();
+ NonSyncDataInputBuffer inBuffer = new NonSyncDataInputBuffer();
+ NonSyncDataOutputBuffer outBuffer = new NonSyncDataOutputBuffer();
outBuffer.write(string_bytes);
inBuffer.reset(outBuffer.getData(), 0, outBuffer.getLength());
byte[] readBytes = new byte[string_bytes.length];
Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java?rev=801082&r1=801081&r2=801082&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java Wed Aug 5 08:01:46 2009
@@ -18,14 +18,16 @@
package org.apache.hadoop.hive.serde2;
-import java.io.*;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream;
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream;
/**
* Extensions to bytearrayinput/output streams
*
*/
public class ByteStream {
- public static class Input extends ByteArrayInputStream {
+ public static class Input extends NonSyncByteArrayInputStream {
public byte[] getData() { return buf; }
public int getCount() { return count;}
public void reset(byte [] argBuf, int argCount) {
@@ -43,7 +45,7 @@
}
}
- public static class Output extends ByteArrayOutputStream {
+ public static class Output extends NonSyncByteArrayOutputStream {
public byte[] getData() { return buf; }
public int getCount() { return count;}