You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/06/07 03:39:38 UTC

svn commit: r1490492 - in /hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql: exec/vector/ io/orc/

Author: hashutosh
Date: Fri Jun  7 01:39:37 2013
New Revision: 1490492

URL: http://svn.apache.org/r1490492
Log:
HIVE-4673 : Use VectorExpessionWriter to write column vectors into Writables. (Jitendra Nath Pandey via Ashutosh Chauhan)

Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Fri Jun  7 01:39:37 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Jo
 import org.apache.hadoop.hive.ql.exec.Stat;
 import org.apache.hadoop.hive.ql.exec.TerminalOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -331,7 +332,6 @@ public class VectorFileSinkOperator exte
       statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
 
       serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
-      System.out.println("Deserializer class = "+serializer.getClass().toString());
       serializer.initialize(null, conf.getTableInfo().getProperties());
       outputClass = serializer.getSerializedClass();
 
@@ -589,6 +589,10 @@ public class VectorFileSinkOperator exte
       if (vectorizedSerde) {
         row = records[i];
       } else {
+        if (vrg.valueWriters == null) {
+          vrg.setValueWriters(VectorExpressionWriterFactory.getExpressionWriters(
+              (StructObjectInspector)inputObjInspectors[0]));
+        }
         row = new Text(vrg.toString());
       }
     /* Create list bucketing sub-directory only if stored-as-directories is on. */

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Fri Jun  7 01:39:37 2013
@@ -19,16 +19,21 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 
 /**
  * Select operator implementation.
@@ -44,6 +49,8 @@ public class VectorSelectOperator extend
 
   private int [] projectedColumns = null;
 
+  private VectorExpressionWriter [] valueWriters = null;
+
   public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
     this.vContext = ctxt;
     this.conf = (SelectDesc) conf;
@@ -57,6 +64,8 @@ public class VectorSelectOperator extend
       return;
     }
 
+    List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
+
     List<ExprNodeDesc> colList = conf.getColList();
     vContext.setOperatorType(OperatorType.SELECT);
     vExpressions = new VectorExpression[colList.size()];
@@ -66,6 +75,15 @@ public class VectorSelectOperator extend
       // Update column map with output column names
       vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
     }
+    valueWriters = VectorExpressionWriterFactory.getExpressionWriters(colList);
+    for (VectorExpressionWriter vew : valueWriters) {
+      objectInspectors.add(vew.getObjectInspector());
+    }
+
+    List<String> outputFieldNames = conf.getOutputColumnNames();
+    outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+        outputFieldNames, objectInspectors);
+
     initializeChildren(hconf);
     projectedColumns = new int [vExpressions.length];
     for (int i = 0; i < projectedColumns.length; i++) {
@@ -97,6 +115,8 @@ public class VectorSelectOperator extend
     }
 
     // Prepare output, set the projections
+    VectorExpressionWriter [] originalValueWriters = vrg.valueWriters;
+    vrg.setValueWriters(valueWriters);
     int[] originalProjections = vrg.projectedColumns;
     int originalProjectionSize = vrg.projectionSize;
     vrg.projectionSize = vExpressions.length;
@@ -106,6 +126,7 @@ public class VectorSelectOperator extend
     // Revert the projected columns back, because vrg will be re-used.
     vrg.projectionSize = originalProjectionSize;
     vrg.projectedColumns = originalProjections;
+    vrg.valueWriters = originalValueWriters;
   }
 
   /**

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java Fri Jun  7 01:39:37 2013
@@ -21,6 +21,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -54,8 +56,7 @@ public class VectorizedRowBatch implemen
    */
   public static final int DEFAULT_SIZE = 1024;
 
-  private final Writable[] writableRow;
-  private int rowIteratorIndex = 0;
+  public VectorExpressionWriter[] valueWriters = null;
 
   /**
    * Return a batch with the specified number of columns.
@@ -81,7 +82,6 @@ public class VectorizedRowBatch implemen
     selected = new int[size];
     selectedInUse = false;
     this.cols = new ColumnVector[numCols];
-    writableRow = new Writable[numCols];
     projectedColumns = new int[numCols];
 
     // Initially all columns are projected and in the same order
@@ -91,30 +91,6 @@ public class VectorizedRowBatch implemen
     }
   }
 
-  public void initRowIterator(){
-    this.rowIteratorIndex = 0;
-  }
-
-  public Writable [] getNextRow() {
-    if (rowIteratorIndex >= size) {
-      return null;
-    }
-    if (selectedInUse) {
-      int i = selected[rowIteratorIndex];
-      for (int k = 0; k < projectionSize; k++) {
-        int c = this.projectedColumns[k];
-        writableRow[c] = cols[c].getWritableObject(i);
-      }
-    } else {
-      int i = rowIteratorIndex;
-      for (int k = 0; k < projectionSize; k++) {
-        int c = this.projectedColumns[k];
-        writableRow[c] = cols[c].getWritableObject(i);
-      }
-    }
-    return writableRow;
-  }
-
   /**
    * Return count of qualifying rows.
    *
@@ -130,45 +106,51 @@ public class VectorizedRowBatch implemen
       return "";
     }
     StringBuilder b = new StringBuilder();
-    if (this.selectedInUse) {
-      for (int j = 0; j < size; j++) {
-        int i = selected[j];
-        int colIndex = 0;
-        for (int k = 0; k < projectionSize; k++) {
-          ColumnVector cv = cols[this.projectedColumns[k]];
-          if (cv.isRepeating) {
-            b.append(cv.getWritableObject(0).toString());
-          } else {
-            b.append(cv.getWritableObject(i).toString());
+    try {
+      if (this.selectedInUse) {
+        for (int j = 0; j < size; j++) {
+          int i = selected[j];
+          int colIndex = 0;
+          for (int k = 0; k < projectionSize; k++) {
+            int projIndex = projectedColumns[k];
+            ColumnVector cv = cols[projIndex];
+            if (cv.isRepeating) {
+              b.append(valueWriters[k].writeValue(cv, 0).toString());
+            } else {
+              b.append(valueWriters[k].writeValue(cv, i).toString());
+            }
+            colIndex++;
+            if (colIndex < cols.length) {
+              b.append('\u0001');
+            }
           }
-          colIndex++;
-          if (colIndex < cols.length) {
-            b.append('\u0001');
+          if (j < size - 1) {
+            b.append('\n');
           }
         }
-        if (j < size-1) {
-          b.append('\n');
-        }
-      }
-    } else {
-      for (int i = 0; i < size; i++) {
-        int colIndex = 0;
-        for (int k = 0; k < projectionSize; k++) {
-          ColumnVector cv = cols[this.projectedColumns[k]];
-          if (cv.isRepeating) {
-            b.append(cv.getWritableObject(0).toString());
-          } else {
-            b.append(cv.getWritableObject(i).toString());
+      } else {
+        for (int i = 0; i < size; i++) {
+          int colIndex = 0;
+          for (int k = 0; k < projectionSize; k++) {
+            int projIndex = projectedColumns[k];
+            ColumnVector cv = cols[projIndex];
+            if (cv.isRepeating) {
+              b.append(valueWriters[k].writeValue(cv, 0).toString());
+            } else {
+              b.append(valueWriters[k].writeValue(cv, i).toString());
+            }
+            colIndex++;
+            if (colIndex < cols.length) {
+              b.append('\u0001');
+            }
           }
-          colIndex++;
-          if (colIndex < cols.length) {
-            b.append('\u0001');
+          if (i < size - 1) {
+            b.append('\n');
           }
         }
-        if (i < size-1) {
-          b.append('\n');
-        }
       }
+    } catch (HiveException ex) {
+      throw new RuntimeException(ex);
     }
     return b.toString();
   }
@@ -182,5 +164,9 @@ public class VectorizedRowBatch implemen
   public void write(DataOutput arg0) throws IOException {
     throw new UnsupportedOperationException("Don't call me");
   }
+
+  public void setValueWriters(VectorExpressionWriter[] valueWriters) {
+    this.valueWriters = valueWriters;
+  }
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Fri Jun  7 01:39:37 2013
@@ -139,7 +139,7 @@ public class OrcSerde implements SerDe, 
   public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
       throws SerDeException {
     if (vos == null) {
-      vos = new VectorizedOrcSerde();
+      vos = new VectorizedOrcSerde(objInspector);
     }
     return vos.serialize(vrg, objInspector);
   }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Jun  7 01:39:37 2013
@@ -552,7 +552,7 @@ class RecordReaderImpl implements Record
       // Read value entries based on isNull entries
       for (int i = 0; i < batchSize; i++) {
         if (!result.isNull[i]) {
-          result.vector[i] = SerializationUtils.readDouble(stream);
+          result.vector[i] = SerializationUtils.readFloat(stream);
         } else {
 
           // If the value is not present then set NaN

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java Fri Jun  7 01:39:37 2013
@@ -18,7 +18,11 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -31,43 +35,54 @@ public class VectorizedOrcSerde extends 
   private final Writable [] orcRowArray = new Writable [VectorizedRowBatch.DEFAULT_SIZE];
   private final ObjectWritable ow = new ObjectWritable();
   private final ObjectInspector inspector = null;
+  private final VectorExpressionWriter [] valueWriters;
 
-  public VectorizedOrcSerde() {
+  public VectorizedOrcSerde(ObjectInspector objInspector) {
     super();
     for (int i = 0; i < orcStructArray.length; i++) {
       orcRowArray[i] = new OrcSerdeRow();
     }
+    try {
+      valueWriters = VectorExpressionWriterFactory
+          .getExpressionWriters((StructObjectInspector) objInspector);
+    } catch (HiveException e) {
+      throw new RuntimeException(e);
+    }
   }
 
 
   @Override
   public Writable serialize(Object obj, ObjectInspector inspector) {
-    VectorizedRowBatch batch = (VectorizedRowBatch)obj;
-    for (int i = 0; i < batch.size; i++) {
-      OrcStruct ost = orcStructArray[i];
-      if (ost == null) {
-        ost = new OrcStruct(batch.numCols);
-        orcStructArray[i] = ost;
-      }
-      int index = 0;
-      if (batch.selectedInUse) {
-        index = batch.selected[i];
-      } else {
-        index = i;
-      }
-      for (int p = 0; p < batch.projectionSize; p++) {
-        int k = batch.projectedColumns[p];
-        Writable w;
-        if (batch.cols[k].isRepeating) {
-          w = batch.cols[k].getWritableObject(0);
+    VectorizedRowBatch batch = (VectorizedRowBatch) obj;
+    try {
+      for (int i = 0; i < batch.size; i++) {
+        OrcStruct ost = orcStructArray[i];
+        if (ost == null) {
+          ost = new OrcStruct(batch.numCols);
+          orcStructArray[i] = ost;
+        }
+        int index = 0;
+        if (batch.selectedInUse) {
+          index = batch.selected[i];
         } else {
-          w = batch.cols[k].getWritableObject(index);
+          index = i;
+        }
+        for (int p = 0; p < batch.projectionSize; p++) {
+          int k = batch.projectedColumns[p];
+          Writable w;
+          if (batch.cols[k].isRepeating) {
+            w = (Writable) valueWriters[p].writeValue(batch.cols[k], 0);
+          } else {
+            w = (Writable) valueWriters[p].writeValue(batch.cols[k], index);
+          }
+          ost.setFieldValue(k, w);
         }
-        ost.setFieldValue(k, w);
+        OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i];
+        row.realRow = ost;
+        row.inspector = inspector;
       }
-      OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i];
-      row.realRow = ost;
-      row.inspector = inspector;
+    } catch (HiveException ex) {
+      throw new RuntimeException(ex);
     }
     ow.set(orcRowArray);
     return ow;