You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2011/08/24 06:54:40 UTC
svn commit: r1160976 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
Author: cws
Date: Wed Aug 24 04:54:40 2011
New Revision: 1160976
URL: http://svn.apache.org/viewvc?rev=1160976&view=rev
Log:
HIVE-2396. RCFileReader Buffer Reuse (Tim Armstrong via cws)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1160976&r1=1160975&r2=1160976&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Aug 24 04:54:40 2011
@@ -328,24 +328,26 @@ public class RCFile {
return loadedColumnsValueBuffer[index].getData();
}
- NonSyncDataOutputBuffer compressedData = loadedColumnsValueBuffer[index];
- NonSyncDataOutputBuffer decompressedData = new NonSyncDataOutputBuffer();
+ NonSyncDataOutputBuffer compressedData = compressedColumnsValueBuffer[index];
decompressBuffer.reset();
DataInputStream valueIn = new DataInputStream(deflatFilter);
deflatFilter.resetState();
decompressBuffer.reset(compressedData.getData(),
keyBuffer.eachColumnValueLen[colIndex]);
- decompressedData.write(valueIn,
+
+ NonSyncDataOutputBuffer decompressedColBuf = loadedColumnsValueBuffer[index];
+ decompressedColBuf.reset();
+ decompressedColBuf.write(valueIn,
keyBuffer.eachColumnUncompressedValueLen[colIndex]);
- loadedColumnsValueBuffer[index] = decompressedData;
decompressedFlag[index] = true;
numCompressed--;
- return decompressedData.getData();
+ return decompressedColBuf.getData();
}
}
// used to load columns' value into memory
private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
+ private NonSyncDataOutputBuffer[] compressedColumnsValueBuffer = null;
private boolean[] decompressedFlag = null;
private int numCompressed;
private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
@@ -405,6 +407,8 @@ public class RCFile {
decompressedFlag = new boolean[columnNumber - skipped];
lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber
- skipped];
+ compressedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+ - skipped];
this.codec = codec;
if (codec != null) {
valDecompressor = CodecPool.getDecompressor(codec);
@@ -425,6 +429,7 @@ public class RCFile {
decompressedFlag[readIndex] = false;
lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(
readIndex, k);
+ compressedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
} else {
decompressedFlag[readIndex] = true;
}
@@ -454,7 +459,13 @@ public class RCFile {
skipTotal = 0;
}
- NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
+ NonSyncDataOutputBuffer valBuf;
+ if (codec != null){
+ // load into compressed buf first
+ valBuf = compressedColumnsValueBuffer[addIndex];
+ } else {
+ valBuf = loadedColumnsValueBuffer[addIndex];
+ }
valBuf.reset();
valBuf.write(in, vaRowsLen);
if (codec != null) {