You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2011/08/24 06:54:40 UTC

svn commit: r1160976 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Author: cws
Date: Wed Aug 24 04:54:40 2011
New Revision: 1160976

URL: http://svn.apache.org/viewvc?rev=1160976&view=rev
Log:
HIVE-2396. RCFileReader Buffer Reuse (Tim Armstrong via cws)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1160976&r1=1160975&r2=1160976&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Aug 24 04:54:40 2011
@@ -328,24 +328,26 @@ public class RCFile {
           return loadedColumnsValueBuffer[index].getData();
         }
 
-        NonSyncDataOutputBuffer compressedData = loadedColumnsValueBuffer[index];
-        NonSyncDataOutputBuffer decompressedData = new NonSyncDataOutputBuffer();
+        NonSyncDataOutputBuffer compressedData = compressedColumnsValueBuffer[index];
         decompressBuffer.reset();
         DataInputStream valueIn = new DataInputStream(deflatFilter);
         deflatFilter.resetState();
         decompressBuffer.reset(compressedData.getData(),
             keyBuffer.eachColumnValueLen[colIndex]);
-        decompressedData.write(valueIn,
+
+        NonSyncDataOutputBuffer decompressedColBuf = loadedColumnsValueBuffer[index];
+        decompressedColBuf.reset();
+        decompressedColBuf.write(valueIn,
             keyBuffer.eachColumnUncompressedValueLen[colIndex]);
-        loadedColumnsValueBuffer[index] = decompressedData;
         decompressedFlag[index] = true;
         numCompressed--;
-        return decompressedData.getData();
+        return decompressedColBuf.getData();
       }
     }
 
     // used to load columns' value into memory
     private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
+    private NonSyncDataOutputBuffer[] compressedColumnsValueBuffer = null;
     private boolean[] decompressedFlag = null;
     private int numCompressed;
     private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
@@ -405,6 +407,8 @@ public class RCFile {
       decompressedFlag = new boolean[columnNumber - skipped];
       lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber
           - skipped];
+      compressedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+                                                                 - skipped];
       this.codec = codec;
       if (codec != null) {
         valDecompressor = CodecPool.getDecompressor(codec);
@@ -425,6 +429,7 @@ public class RCFile {
           decompressedFlag[readIndex] = false;
           lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(
               readIndex, k);
+          compressedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
         } else {
           decompressedFlag[readIndex] = true;
         }
@@ -454,7 +459,13 @@ public class RCFile {
           skipTotal = 0;
         }
 
-        NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
+        NonSyncDataOutputBuffer valBuf;
+        if (codec != null){
+           // load into compressed buf first
+          valBuf = compressedColumnsValueBuffer[addIndex];
+        } else {
+          valBuf = loadedColumnsValueBuffer[addIndex];
+        }
         valBuf.reset();
         valBuf.write(in, vaRowsLen);
         if (codec != null) {