You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2011/08/23 04:25:31 UTC

svn commit: r1160525 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Author: cws
Date: Tue Aug 23 02:25:30 2011
New Revision: 1160525

URL: http://svn.apache.org/viewvc?rev=1160525&view=rev
Log:
HIVE-2350. Improve RCFile Read Speed (Tim Armstrong via cws)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1160525&r1=1160524&r2=1160525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Aug 23 02:25:30 2011
@@ -339,6 +339,7 @@ public class RCFile {
             keyBuffer.eachColumnUncompressedValueLen[colIndex]);
         loadedColumnsValueBuffer[index] = decompressedData;
         decompressedFlag[index] = true;
+        numCompressed--;
         return decompressedData.getData();
       }
     }
@@ -346,6 +347,7 @@ public class RCFile {
     // used to load columns' value into memory
     private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
     private boolean[] decompressedFlag = null;
+    private int numCompressed;
     private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
 
     boolean inited = false;
@@ -409,7 +411,11 @@ public class RCFile {
         deflatFilter = codec.createInputStream(decompressBuffer,
             valDecompressor);
       }
-
+      if (codec != null) {
+        numCompressed = decompressedFlag.length;
+      } else {
+        numCompressed = 0;
+      }
       for (int k = 0, readIndex = 0; k < columnNumber; k++) {
         if (skippedColIDs[k]) {
           continue;
@@ -456,6 +462,9 @@ public class RCFile {
         }
         addIndex++;
       }
+      if (codec != null) {
+        numCompressed = decompressedFlag.length;
+      }
 
       if (skipTotal != 0) {
         in.skipBytes(skipTotal);
@@ -964,7 +973,12 @@ public class RCFile {
    * 
    */
   public static class Reader {
-
+    private static class SelectedColumn {
+      public int colIndex;
+      public int rowReadIndex;
+      public int runLength;
+      public int prvLength;
+    }
     private final Path file;
     private final FSDataInputStream in;
 
@@ -999,16 +1013,21 @@ public class RCFile {
 
     private int passedRowsNum = 0;
 
-    private int[] columnRowReadIndex = null;
-    private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
-    private final int[] columnRunLength;
-    private final int[] columnPrvLength;
+
     private boolean decompress = false;
 
     private Decompressor keyDecompressor;
     NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
 
-    int[] prjColIDs = null; // selected column IDs
+    //Current state of each selected column - e.g. current run length, etc.
+    // The size of the array is equal to the number of selected columns
+    private final SelectedColumn[] selectedColumns;
+
+    // map of original column id -> index among selected columns
+    private final int[] revPrjColIDs;
+
+    // column value lengths for each of the selected columns
+    private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
 
     /** Create a new RCFile reader. */
     public Reader(FileSystem fs, Path file, Configuration conf) throws IOException {
@@ -1053,7 +1072,7 @@ public class RCFile {
 
       java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
           .getReadColumnIDs(conf);
-      skippedColIDs = new boolean[columnNumber];
+      boolean[] skippedColIDs = new boolean[columnNumber];
       if (notSkipIDs.size() > 0) {
         for (int i = 0; i < skippedColIDs.length; i++) {
           skippedColIDs[i] = true;
@@ -1081,31 +1100,31 @@ public class RCFile {
         }
       }
 
+
+      revPrjColIDs = new int[columnNumber];
       // get list of selected column IDs
-      prjColIDs = new int[loadColumnNum];
+      selectedColumns = new SelectedColumn[loadColumnNum];
+      colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
       for (int i = 0, j = 0; i < columnNumber; ++i) {
         if (!skippedColIDs[i]) {
-          prjColIDs[j++] = i;
-        }
-      }
-
-      colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
-      columnRunLength = new int[columnNumber];
-      columnPrvLength = new int[columnNumber];
-      columnRowReadIndex = new int[columnNumber];
-      for (int i = 0; i < columnNumber; i++) {
-        columnRowReadIndex[i] = 0;
-        if (!skippedColIDs[i]) {
-          colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+          SelectedColumn col = new SelectedColumn();
+          col.colIndex = i;
+          col.runLength = 0;
+          col.prvLength = -1;
+          col.rowReadIndex = 0;
+          selectedColumns[j] = col;
+          colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
+          revPrjColIDs[i] = j;
+          j++;
+        } else {
+          revPrjColIDs[i] = -1;
         }
-        columnRunLength[i] = 0;
-        columnPrvLength[i] = -1;
       }
 
       currentKey = createKeyBuffer();
       currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec);
     }
-    
+
     /**
      * Override this method to specialize the type of
      * {@link FSDataInputStream} returned.
@@ -1334,13 +1353,14 @@ public class RCFile {
       readRowsIndexInBuffer = 0;
       recordsNumInValBuffer = currentKey.numberRows;
 
-      for (int prjColID : prjColIDs) {
-        int i = prjColID;
-        colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
-            .getData(), currentKey.allCellValLenBuffer[i].getLength());
-        columnRowReadIndex[i] = 0;
-        columnRunLength[i] = 0;
-        columnPrvLength[i] = -1;
+      for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
+        SelectedColumn col = selectedColumns[selIx];
+        int colIx = col.colIndex;
+        NonSyncDataOutputBuffer buf = currentKey.allCellValLenBuffer[colIx];
+        colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
+        col.rowReadIndex = 0;
+        col.runLength = 0;
+        col.prvLength = -1;
       }
 
       return currentKeyLength;
@@ -1384,8 +1404,8 @@ public class RCFile {
      */
     public BytesRefArrayWritable getColumn(int columnID,
         BytesRefArrayWritable rest) throws IOException {
-
-      if (skippedColIDs[columnID]) {
+      int selColIdx = revPrjColIDs[columnID];
+      if (selColIdx == -1) {
         return null;
       }
 
@@ -1402,16 +1422,26 @@ public class RCFile {
       int columnNextRowStart = 0;
       fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
           .getData(), currentKey.allCellValLenBuffer[columnID].getLength());
+      SelectedColumn selCol = selectedColumns[selColIdx];
+      byte[] uncompData = null;
+      ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
+      boolean decompressed = currentValue.decompressedFlag[selColIdx];
+      if (decompressed) {
+        uncompData = 
+              currentValue.loadedColumnsValueBuffer[selColIdx].getData();
+      } else {
+        decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
+      }
       for (int i = 0; i < recordsNumInValBuffer; i++) {
-        int length = getColumnNextValueLength(columnID);
+        colAdvanceRow(selColIdx, selCol);
+        int length = selCol.prvLength;
 
         BytesRefWritable currentCell = rest.get(i);
-        if (currentValue.decompressedFlag[columnID]) {
-          currentCell.set(currentValue.loadedColumnsValueBuffer[columnID]
-              .getData(), columnNextRowStart, length);
+
+        if (decompressed) {
+          currentCell.set(uncompData, columnNextRowStart, length);
         } else {
-          currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID],
-              columnNextRowStart, length);
+          currentCell.set(decompCallBack, columnNextRowStart, length);
         }
         columnNextRowStart = columnNextRowStart + length;
       }
@@ -1490,44 +1520,62 @@ public class RCFile {
 
       // we do not use BytesWritable here to avoid the byte-copy from
       // DataOutputStream to BytesWritable
-
-      for (int j = 0; j < prjColIDs.length; ++j) {
-        int i = prjColIDs[j];
-
-        BytesRefWritable ref = ret.unCheckedGet(i);
-
-        int columnCurrentRowStart = columnRowReadIndex[i];
-        int length = getColumnNextValueLength(i);
-        columnRowReadIndex[i] = columnCurrentRowStart + length;
-
-        if (currentValue.decompressedFlag[j]) {
+      if (currentValue.numCompressed > 0) {
+        for (int j = 0; j < selectedColumns.length; ++j) {
+          SelectedColumn col = selectedColumns[j];
+          int i = col.colIndex;
+  
+          BytesRefWritable ref = ret.unCheckedGet(i);
+  
+          colAdvanceRow(j, col);
+  
+          if (currentValue.decompressedFlag[j]) {
+            ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+                col.rowReadIndex, col.prvLength);
+          } else {
+            ref.set(currentValue.lazyDecompressCallbackObjs[j],
+                col.rowReadIndex, col.prvLength);
+          }
+          col.rowReadIndex += col.prvLength;
+        }
+      } else {
+        // This version of the loop eliminates a condition check and branch 
+        // and is measurably faster (20% or so)
+        for (int j = 0; j < selectedColumns.length; ++j) {
+          SelectedColumn col = selectedColumns[j];
+          int i = col.colIndex;
+  
+          BytesRefWritable ref = ret.unCheckedGet(i);
+  
+          colAdvanceRow(j, col);
           ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
-              columnCurrentRowStart, length);
-        } else {
-          ref.set(currentValue.lazyDecompressCallbackObjs[j],
-              columnCurrentRowStart, length);
+                col.rowReadIndex, col.prvLength);
+          col.rowReadIndex += col.prvLength;
         }
       }
       rowFetched = true;
     }
 
-    private int getColumnNextValueLength(int i) throws IOException {
-      if (columnRunLength[i] > 0) {
-        --columnRunLength[i];
-        return columnPrvLength[i];
+    /**
+     * Advance column state to the next now: update offsets, run lengths etc
+     * @param selCol - index among selectedColumns
+     * @param col - column object to update the state of.  prvLength will be
+     *        set to the new read position
+     * @throws IOException
+     */
+    private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException {
+      if (col.runLength > 0) {
+        --col.runLength;
       } else {
-        int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
+        int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[selCol]);
         if (length < 0) {
           // we reach a runlength here, use the previous length and reset
           // runlength
-          columnRunLength[i] = ~length;
-          columnRunLength[i]--;
-          length = columnPrvLength[i];
+          col.runLength = (~length) - 1;
         } else {
-          columnPrvLength[i] = length;
-          columnRunLength[i] = 0;
+          col.prvLength = length;
+          col.runLength = 0;
         }
-        return length;
       }
     }