You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/09/08 22:28:50 UTC

svn commit: r812678 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java

Author: zshao
Date: Tue Sep  8 20:28:50 2009
New Revision: 812678

URL: http://svn.apache.org/viewvc?rev=812678&view=rev
Log:
HIVE-756. Performance improvement for RCFile and ColumnarSerDe in Hive. (Ning Zhang via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Sep  8 20:28:50 2009
@@ -56,6 +56,9 @@
     HIVE-813. Show the actual exception thrown in UDF evaluation
     (Zheng Shao via rmurthy)
 
+    HIVE-756. Performance improvement for RCFile and ColumnarSerDe in Hive.
+    (Ning Zhang via zshao)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Sep  8 20:28:50 2009
@@ -863,6 +863,8 @@
 
     private Decompressor keyDecompressor;
     NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
+    
+    int[] prjColIDs = null; // selected column IDs
 
     /** Create a new RCFile reader. */
     public Reader(FileSystem fs, Path file, Configuration conf)
@@ -900,6 +902,9 @@
             skippedColIDs[read] = false;
         }
       } else {
+        // TODO: if no column name is specified e.g, in select count(1) from tt;
+        // skip all columns, this should be distinguished from the case:
+        // select * from tt;
         for (int i = 0; i < skippedColIDs.length; i++) {
           skippedColIDs[i] = false;
         }
@@ -913,6 +918,14 @@
         }
       }
 
+      // get list of selected column IDs
+      prjColIDs = new int[loadColumnNum];
+      for ( int i = 0, j = 0; i < columnNumber; ++i ) {
+        if (!skippedColIDs[i]) {
+          prjColIDs[j++] = i;
+        }
+      }
+
       colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
       columnRowReadIndex = new int[columnNumber];
       for (int i = 0; i < columnNumber; i++) {
@@ -1128,9 +1141,8 @@
       readRowsIndexInBuffer = 0;
       recordsNumInValBuffer = currentKey.numberRows;
 
-      for (int i = 0; i < columnNumber; i++) {
-        if (skippedColIDs[i])
-          continue;
+      for (int j = 0; j < prjColIDs.length; j++) {
+        int i = prjColIDs[j];
         colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
             .getData(), currentKey.allCellValLenBuffer[i].getLength());
         columnRowReadIndex[i] = 0;
@@ -1258,29 +1270,23 @@
 
       if (!currentValue.inited) {
         currentValueBuffer();
+        ret.resetValid(columnNumber); // do this only when not intialized 
       }
 
       // we do not use BytesWritable here to avoid the byte-copy from
       // DataOutputStream to BytesWritable
 
-      ret.resetValid(columnNumber);
+      for (int j = 0; j < prjColIDs.length; ++j) {
+        int i = prjColIDs[j];
 
-      for (int i = 0, readIndex = 0; i < columnNumber; i++) {
         BytesRefWritable ref = ret.unCheckedGet(i);
 
-        if (skippedColIDs[i]) {
-          if (ref != BytesRefWritable.ZeroBytesRefWritable)
-            ret.set(i, BytesRefWritable.ZeroBytesRefWritable);
-          continue;
-        }
-
         int columnCurrentRowStart = (int) columnRowReadIndex[i];
         int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
         columnRowReadIndex[i] = columnCurrentRowStart + length;
 
-        ref.set(currentValue.loadedColumnsValueBuffer[readIndex].getData(),
+        ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
             columnCurrentRowStart, length);
-        readIndex++;
       }
       rowFetched = true;
     }

Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Tue Sep  8 20:28:50 2009
@@ -175,7 +175,7 @@
       StructObjectInspector oi = (StructObjectInspector) serDe
           .getObjectInspector();
       List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-      assertEquals(8, fieldRefs.size());
+      assertEquals("Field size should be 8", 8, fieldRefs.size());
       for (int j = 0; j < fieldRefs.size(); j++) {
         Object fieldData = oi.getStructFieldData(row, fieldRefs.get(j));
         Object standardWritableData = ObjectInspectorUtils.copyToStandardObject(fieldData, 
@@ -301,7 +301,7 @@
       StructObjectInspector oi = (StructObjectInspector) serDe
           .getObjectInspector();
       List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-      assertEquals(8, fieldRefs.size());
+      assertEquals("Field size should be 8", 8, fieldRefs.size());
       for (int i = 0; i < fieldRefs.size(); i++) {
         Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
         Object standardWritableData = ObjectInspectorUtils.copyToStandardObject(fieldData, 
@@ -309,7 +309,7 @@
         assertEquals("Field " + i, standardWritableData, expectedFieldsData[i]);
       }
       // Serialize
-      assertEquals(BytesRefArrayWritable.class, serDe.getSerializedClass());
+      assertEquals("Class of the serialized object should be BytesRefArrayWritable", BytesRefArrayWritable.class, serDe.getSerializedClass());
       BytesRefArrayWritable serializedText = (BytesRefArrayWritable) serDe
           .serialize(row, oi);
       assertEquals("Serialized data", s, serializedText);
@@ -334,6 +334,7 @@
 
     LongWritable rowID = new LongWritable();
     BytesRefArrayWritable cols = new BytesRefArrayWritable();
+    
     while (reader.next(rowID)) {
       reader.getCurrentRow(cols);
       Object row = serDe.deserialize(cols);
@@ -341,16 +342,16 @@
       StructObjectInspector oi = (StructObjectInspector) serDe
           .getObjectInspector();
       List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-      assertEquals(8, fieldRefs.size());
+      assertEquals("Field size should be 8", 8, fieldRefs.size());
 
-      for (int i = 0; i < fieldRefs.size(); i++) {
+      for (int i : readCols) {
         Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
         Object standardWritableData = ObjectInspectorUtils.copyToStandardObject(fieldData, 
             fieldRefs.get(i).getFieldObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
         assertEquals("Field " + i, standardWritableData, expectedPartitalFieldsData[i]);
       }
 
-      assertEquals(BytesRefArrayWritable.class, serDe.getSerializedClass());
+      assertEquals("Class of the serialized object should be BytesRefArrayWritable", BytesRefArrayWritable.class, serDe.getSerializedClass());
       BytesRefArrayWritable serializedBytes = (BytesRefArrayWritable) serDe
           .serialize(row, oi);
       assertEquals("Serialized data", patialS, serializedBytes);

Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Tue Sep  8 20:28:50 2009
@@ -29,6 +29,8 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Text;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * ColumnarStruct is different from LazyStruct in that ColumnarStruct's field
@@ -43,7 +45,12 @@
    * The fields of the struct.
    */
   LazyObject[] fields;
-
+  
+  private static final Log LOG = LogFactory.getLog(ColumnarStruct.class);
+  
+  boolean initialized = false;  // init() function is called?
+  int[]   prjColIDs   = null;   // list of projected column IDs
+  
   /**
    * Construct a ColumnarStruct object with the TypeInfo. It creates the first
    * level object at the first place
@@ -115,27 +122,59 @@
 
     return fields[fieldID].getObject();
   }
-
+  
+  /*  ============================  [PERF] ===================================
+   *  This function is called for every row. Setting up the selected/projected 
+   *  columns at the first call, and don't do that for the following calls. 
+   *  Ideally this should be done in the constructor where we don't need to 
+   *  branch in the function for each row. 
+   *  =========================================================================
+   */
   public void init(BytesRefArrayWritable cols) {
-    int fieldIndex = 0;
-    int min = cols.size() < fields.length ? cols.size() : fields.length;
-
-    for (; fieldIndex < min; fieldIndex++) {
-      BytesRefWritable passedInField = cols.get(fieldIndex);
-      cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
-      if (passedInField.length > 0) {
-        // if (fields[fieldIndex] == null)
-        // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
-        // .get(fieldIndex));
-        fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], passedInField
-            .getStart(), passedInField.getLength());
-        fieldIsNull[fieldIndex] = false;
-      } else {
+    if (initialized) { // short cut for non-first calls
+      for (int i = 0; i < prjColIDs.length; ++i ) {
+        int fieldIndex = prjColIDs[i];
+        BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+        cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+        fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], 
+                                passedInField.getStart(), 
+                                passedInField.getLength());
+      }
+    } else { // first time call init()
+      int fieldIndex = 0;
+      int min = cols.size() < fields.length ? cols.size() : fields.length;
+      
+      ArrayList<Integer> tmp_sel_cols = new ArrayList<Integer>();
+
+      for (; fieldIndex < min; fieldIndex++) {
+        
+        // call the faster unCheckedGet() 
+        // alsert: min <= cols.size()
+        BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+        
+        if (passedInField.length > 0) {
+          // if (fields[fieldIndex] == null)
+          // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
+          // .get(fieldIndex));
+          tmp_sel_cols.add(fieldIndex);
+          cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+          fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], 
+                                  passedInField.getStart(), 
+                                  passedInField.getLength());
+          fieldIsNull[fieldIndex] = false;
+        } else
+          fieldIsNull[fieldIndex] = true;
+      }
+      for (; fieldIndex < fields.length; fieldIndex++)
         fieldIsNull[fieldIndex] = true;
+      
+      // maintain a list of non-NULL column IDs
+      prjColIDs = new int[tmp_sel_cols.size()];
+      for (int i = 0; i < prjColIDs.length; ++i ) {
+        prjColIDs[i] = tmp_sel_cols.get(i).intValue();
       }
+      initialized = true;
     }
-    for (; fieldIndex < fields.length; fieldIndex++)
-      fieldIsNull[fieldIndex] = true;
   }
 
   ArrayList<Object> cachedList;