You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/19 21:07:34 UTC

svn commit: r805934 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java

Author: zshao
Date: Wed Aug 19 19:07:31 2009
New Revision: 805934

URL: http://svn.apache.org/viewvc?rev=805934&view=rev
Log:
HIVE-756. Performance improvement for RCFile and ColumnarSerDe. (Ning Zhang via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Aug 19 19:07:31 2009
@@ -26,6 +26,9 @@
     HIVE-759. Add "hive.intermediate.compression.codec/type" option.
     (Yongqiang He via zshao)
 
+    HIVE-756. Performance improvement for RCFile and ColumnarSerDe.
+    (Ning Zhang via zshao)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Aug 19 19:07:31 2009
@@ -224,8 +224,14 @@
     String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
     String[] list = StringUtils.split(skips);
     ArrayList<Integer> result = new ArrayList<Integer>(list.length);
-    for (int i = 0; i < list.length; i++) {
-      result.add(Integer.parseInt(list[i]));
+    if (list.length == 1 && list[0] == "" ) {
+      // this is the case when conf value is specified by with a empty string ""
+      // the is to select all columns by default
+      result.add(new Integer(-1));
+    } else {
+      for (int i = 0; i < list.length; i++) {
+        result.add(Integer.parseInt(list[i]));
+      }
     }
     return result;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Aug 19 19:07:31 2009
@@ -863,6 +863,8 @@
 
     private Decompressor keyDecompressor;
     NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
+    
+    int[] prjColIDs = null; // selected column IDs
 
     /** Create a new RCFile reader. */
     public Reader(FileSystem fs, Path file, Configuration conf)
@@ -891,17 +893,29 @@
 
       java.util.ArrayList<Integer> notSkipIDs = HiveFileFormatUtils.getReadColumnIDs(conf);
       skippedColIDs = new boolean[columnNumber];
+
       if (notSkipIDs.size() > 0) {
+        
+        boolean to_skip;       // default value for skipping a column
+        
+        if (notSkipIDs.size() == 1 && notSkipIDs.get(0).intValue() == -1 ) {
+          // this is the case to get all columns
+          to_skip = false;
+        } else {
+          to_skip = true;
+        }
+
         for (int i = 0; i < skippedColIDs.length; i++) {
-          skippedColIDs[i] = true;
+          skippedColIDs[i] = to_skip;
         }
         for (int read : notSkipIDs) {
-          if (read < columnNumber)
+          if (read < columnNumber) 
             skippedColIDs[read] = false;
         }
       } else {
+        // no column name is specified e.g, in select count(1) from tt; skip all columns
         for (int i = 0; i < skippedColIDs.length; i++) {
-          skippedColIDs[i] = false;
+          skippedColIDs[i] = true;
         }
       }
 
@@ -913,6 +927,14 @@
         }
       }
 
+      // get list of selected column IDs
+      prjColIDs = new int[loadColumnNum];
+      for ( int i = 0, j = 0; i < columnNumber; ++i ) {
+        if (!skippedColIDs[i]) {
+          prjColIDs[j++] = i;
+        }
+      }
+
       colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
       columnRowReadIndex = new int[columnNumber];
       for (int i = 0; i < columnNumber; i++) {
@@ -1128,9 +1150,8 @@
       readRowsIndexInBuffer = 0;
       recordsNumInValBuffer = currentKey.numberRows;
 
-      for (int i = 0; i < columnNumber; i++) {
-        if (skippedColIDs[i])
-          continue;
+      for (int j = 0; j < prjColIDs.length; j++) {
+        int i = prjColIDs[j];
         colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
             .getData(), currentKey.allCellValLenBuffer[i].getLength());
         columnRowReadIndex[i] = 0;
@@ -1258,29 +1279,23 @@
 
       if (!currentValue.inited) {
         currentValueBuffer();
+        ret.resetValid(columnNumber); // do this only when not intialized 
       }
 
       // we do not use BytesWritable here to avoid the byte-copy from
       // DataOutputStream to BytesWritable
 
-      ret.resetValid(columnNumber);
+      for (int j = 0; j < prjColIDs.length; ++j) {
+        int i = prjColIDs[j];
 
-      for (int i = 0, readIndex = 0; i < columnNumber; i++) {
         BytesRefWritable ref = ret.unCheckedGet(i);
 
-        if (skippedColIDs[i]) {
-          if (ref != BytesRefWritable.ZeroBytesRefWritable)
-            ret.set(i, BytesRefWritable.ZeroBytesRefWritable);
-          continue;
-        }
-
         int columnCurrentRowStart = (int) columnRowReadIndex[i];
         int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
         columnRowReadIndex[i] = columnCurrentRowStart + length;
 
-        ref.set(currentValue.loadedColumnsValueBuffer[readIndex].getData(),
+        ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
             columnCurrentRowStart, length);
-        readIndex++;
       }
       rowFetched = true;
     }

Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Wed Aug 19 19:07:31 2009
@@ -29,6 +29,8 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Text;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * ColumnarStruct is different from LazyStruct in that ColumnarStruct's field
@@ -43,7 +45,12 @@
    * The fields of the struct.
    */
   LazyObject[] fields;
-
+  
+  private static final Log LOG = LogFactory.getLog(ColumnarStruct.class);
+  
+  boolean initialized = false;  // init() function is called?
+  int[]   prjColIDs   = null;   // list of projected column IDs
+  
   /**
    * Construct a ColumnarStruct object with the TypeInfo. It creates the first
    * level object at the first place
@@ -115,27 +122,59 @@
 
     return fields[fieldID].getObject();
   }
-
+  
+  /*  ============================  [PERF] ===================================
+   *  This function is called for every row. Setting up the selected/projected 
+   *  columns at the first call, and don't do that for the following calls. 
+   *  Ideally this should be done in the constructor where we don't need to 
+   *  branch in the function for each row. 
+   *  =========================================================================
+   */
   public void init(BytesRefArrayWritable cols) {
-    int fieldIndex = 0;
-    int min = cols.size() < fields.length ? cols.size() : fields.length;
-
-    for (; fieldIndex < min; fieldIndex++) {
-      BytesRefWritable passedInField = cols.get(fieldIndex);
-      cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
-      if (passedInField.length > 0) {
-        // if (fields[fieldIndex] == null)
-        // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
-        // .get(fieldIndex));
-        fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], passedInField
-            .getStart(), passedInField.getLength());
-        fieldIsNull[fieldIndex] = false;
-      } else {
+    if (initialized) { // short cut for non-first calls
+      for (int i = 0; i < prjColIDs.length; ++i ) {
+        int fieldIndex = prjColIDs[i];
+        BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+        cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+        fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], 
+                                passedInField.getStart(), 
+                                passedInField.getLength());
+      }
+    } else { // first time call init()
+      int fieldIndex = 0;
+      int min = cols.size() < fields.length ? cols.size() : fields.length;
+      
+      ArrayList<Integer> tmp_sel_cols = new ArrayList<Integer>();
+
+      for (; fieldIndex < min; fieldIndex++) {
+        
+        // call the faster unCheckedGet() 
+        // alsert: min <= cols.size()
+        BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+        
+        if (passedInField.length > 0) {
+          // if (fields[fieldIndex] == null)
+          // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
+          // .get(fieldIndex));
+          tmp_sel_cols.add(fieldIndex);
+          cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+          fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], 
+                                  passedInField.getStart(), 
+                                  passedInField.getLength());
+          fieldIsNull[fieldIndex] = false;
+        } else
+          fieldIsNull[fieldIndex] = true;
+      }
+      for (; fieldIndex < fields.length; fieldIndex++)
         fieldIsNull[fieldIndex] = true;
+      
+      // maintain a list of non-NULL column IDs
+      prjColIDs = new int[tmp_sel_cols.size()];
+      for (int i = 0; i < prjColIDs.length; ++i ) {
+        prjColIDs[i] = tmp_sel_cols.get(i).intValue();
       }
+      initialized = true;
     }
-    for (; fieldIndex < fields.length; fieldIndex++)
-      fieldIsNull[fieldIndex] = true;
   }
 
   ArrayList<Object> cachedList;