You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/03 20:33:14 UTC

svn commit: r1519788 [2/2] - in /hive/branches/vectorization/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/exec/vector/expressions/ java/o...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Tue Sep  3 18:33:13 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Ex
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
 import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterConstantBooleanVectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprOrExpr;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterStringColLikeStringScalar;
@@ -157,7 +158,8 @@ public class VectorizationContext {
     private final Set<Integer> usedOutputColumns = new HashSet<Integer>();
 
     int allocateOutputColumn(String columnType) {
-      return initialOutputCol + allocateOutputColumnInternal(columnType);
+      int relativeCol = allocateOutputColumnInternal(columnType);
+      return initialOutputCol + relativeCol;
     }
 
     private int allocateOutputColumnInternal(String columnType) {
@@ -192,14 +194,6 @@ public class VectorizationContext {
         usedOutputColumns.remove(index-initialOutputCol);
       }
     }
-
-    String getOutputColumnType(int index) {
-      return outputColumnsTypes[index-initialOutputCol];
-    }
-
-    int getNumOfOutputColumn() {
-      return outputColCount;
-    }
   }
 
   public void setOperatorType(OperatorType opType) {
@@ -311,8 +305,22 @@ public class VectorizationContext {
       return new ConstantVectorExpression(outCol, ((Number) exprDesc.getValue()).doubleValue());
     } else if (type.equalsIgnoreCase("string")) {
       return new ConstantVectorExpression(outCol, ((String) exprDesc.getValue()).getBytes());
+    } else if (type.equalsIgnoreCase("boolean")) {
+      if (this.opType == OperatorType.FILTER) {
+        if (((Boolean) exprDesc.getValue()).booleanValue()) {
+          return new FilterConstantBooleanVectorExpression(1);
+        } else {
+          return new FilterConstantBooleanVectorExpression(0);
+        }
+      } else {
+        if (((Boolean) exprDesc.getValue()).booleanValue()) {
+          return new ConstantVectorExpression(outCol, 1);
+        } else {
+          return new ConstantVectorExpression(outCol, 0);
+        }
+      }
     } else {
-      throw new HiveException("Unsupported constant type");
+      throw new HiveException("Unsupported constant type: "+type.toString());
     }
   }
 
@@ -339,8 +347,7 @@ public class VectorizationContext {
        + outputColumnType + "ColUnaryMinus";
     VectorExpression expr;
     try {
-      expr = (VectorExpression) Class.forName(className).
-          getDeclaredConstructors()[0].newInstance(inputCol, outputCol);
+      expr = (VectorExpression) getConstructor(className).newInstance(inputCol, outputCol);
     } catch (Exception ex) {
       throw new HiveException(ex);
     }
@@ -470,14 +477,14 @@ public class VectorizationContext {
   /* Return a unary string vector expression. This is used for functions like
    * UPPER() and LOWER().
    */
-  private VectorExpression getUnaryStringExpression(String vectorExprClassName, 
+  private VectorExpression getUnaryStringExpression(String vectorExprClassName,
       String resultType, // result type name
       List<ExprNodeDesc> childExprList) throws HiveException {
-    
+
     /* Create an instance of the class vectorExprClassName for the input column or expression result
      * and return it.
      */
-    
+
     ExprNodeDesc childExpr = childExprList.get(0);
     int inputCol;
     VectorExpression v1 = null;
@@ -497,8 +504,7 @@ public class VectorizationContext {
        + vectorExprClassName;
     VectorExpression expr;
     try {
-      expr = (VectorExpression) Class.forName(className).
-          getDeclaredConstructors()[0].newInstance(inputCol, outputCol);
+      expr = (VectorExpression) getConstructor(className).newInstance(inputCol, outputCol);
     } catch (Exception ex) {
       throw new HiveException(ex);
     }
@@ -517,23 +523,23 @@ public class VectorizationContext {
     VectorExpression expr = null;
     int inputCol;
     ExprNodeConstantDesc constDesc;
-    
+
     if ((leftExpr instanceof ExprNodeColumnDesc) &&
         (rightExpr instanceof ExprNodeConstantDesc) ) {
       ExprNodeColumnDesc leftColDesc = (ExprNodeColumnDesc) leftExpr;
       constDesc = (ExprNodeConstantDesc) rightExpr;
       inputCol = getInputColumnIndex(leftColDesc.getColumn());
-      expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, 
-          new Text((byte[]) getScalarValue(constDesc)));  
+      expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol,
+          new Text((byte[]) getScalarValue(constDesc)));
     } else if ((leftExpr instanceof ExprNodeGenericFuncDesc) &&
                (rightExpr instanceof ExprNodeConstantDesc)) {
       v1 = getVectorExpression(leftExpr);
       inputCol = v1.getOutputColumn();
       constDesc = (ExprNodeConstantDesc) rightExpr;
-      expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, 
-          new Text((byte[]) getScalarValue(constDesc)));  
+      expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol,
+          new Text((byte[]) getScalarValue(constDesc)));
     }
-    // TODO add logic to handle cases where left input is an expression. 
+    // TODO add logic to handle cases where left input is an expression.
     if (expr == null) {
       throw new HiveException("Vector LIKE filter expression could not be initialized");
     }
@@ -558,8 +564,8 @@ public class VectorizationContext {
         // org.apache.hadoop.hive.ql.exec.vector.expressions.VectorUDFYearLong
         String vectorUDF = pkg + ".Vector"+udf+"Long";
         try {
-          VectorExpression v2 = (VectorExpression)Class.forName(vectorUDF).
-              getDeclaredConstructors()[0].newInstance(inputCol,outputCol);
+          VectorExpression v2 = (VectorExpression) getConstructor(vectorUDF).
+              newInstance(inputCol,outputCol);
           return v2;
         } catch(Exception e) {
           e.printStackTrace();
@@ -594,8 +600,7 @@ public class VectorizationContext {
       int outputCol = ocm.allocateOutputColumn(getOutputColType(colType,
           scalarType, method));
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol,
             getScalarValue(constDesc), outputCol);
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -612,8 +617,7 @@ public class VectorizationContext {
       String outputColType = getOutputColType(colType, scalarType, method);
       int outputCol = ocm.allocateOutputColumn(outputColType);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc),
+        expr = (VectorExpression) getConstructor(className).newInstance(getScalarValue(constDesc),
             inputCol, outputCol);
       } catch (Exception ex) {
         throw new HiveException("Could not instantiate: "+className, ex);
@@ -631,8 +635,7 @@ public class VectorizationContext {
           colType2, method);
       int outputCol = ocm.allocateOutputColumn(outputColType);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
             outputCol);
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -650,8 +653,7 @@ public class VectorizationContext {
           colType2, method);
       int outputCol = ocm.allocateOutputColumn(outputColType);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
             outputCol);
       } catch (Exception ex) {
         throw new HiveException((ex));
@@ -669,8 +671,7 @@ public class VectorizationContext {
       String className = getBinaryColumnScalarExpressionClassName(colType1,
           scalarType, method);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1,
             getScalarValue(constDesc), outputCol);
       } catch (Exception ex) {
         throw new HiveException((ex));
@@ -689,8 +690,7 @@ public class VectorizationContext {
       String className = getBinaryColumnColumnExpressionClassName(colType1,
           colType2, method);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
             outputCol);
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -708,8 +708,7 @@ public class VectorizationContext {
       String className = getBinaryScalarColumnExpressionClassName(colType2,
           scalarType, method);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc), 
+        expr = (VectorExpression) getConstructor(className).newInstance(getScalarValue(constDesc),
                 inputCol2, outputCol);
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -730,8 +729,7 @@ public class VectorizationContext {
       String className = getBinaryColumnColumnExpressionClassName(colType1,
           colType2, method);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
             outputCol);
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -864,13 +862,13 @@ public class VectorizationContext {
       String className = getFilterColumnScalarExpressionClassName(colType,
           scalarType, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol,
+        Constructor<?> ctor = getConstructor(className);
+        expr = (VectorExpression) ctor.newInstance(inputCol,
             getScalarValue(constDesc));
       } catch (Exception ex) {
         throw new HiveException(ex);
       }
-    } else if ((leftExpr instanceof ExprNodeConstantDesc) && 
+    } else if ((leftExpr instanceof ExprNodeConstantDesc) &&
         (rightExpr instanceof ExprNodeColumnDesc)) {
       ExprNodeConstantDesc constDesc = (ExprNodeConstantDesc) leftExpr;
       ExprNodeColumnDesc rightColDesc = (ExprNodeColumnDesc) rightExpr;
@@ -880,8 +878,8 @@ public class VectorizationContext {
       String className = getFilterScalarColumnExpressionClassName(colType,
           scalarType, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol,
+        //Constructor<?>
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol,
             getScalarValue(constDesc));
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -897,8 +895,7 @@ public class VectorizationContext {
       String className = getFilterColumnColumnExpressionClassName(colType1,
           colType2, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
       } catch (Exception ex) {
         throw new HiveException(ex);
       }
@@ -913,8 +910,7 @@ public class VectorizationContext {
       String className = getFilterColumnColumnExpressionClassName(colType1,
           colType2, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
       } catch (Exception ex) {
         throw new HiveException(ex);
       }
@@ -930,8 +926,7 @@ public class VectorizationContext {
       String className = getFilterColumnColumnExpressionClassName(colType1,
           colType2, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
       } catch (Exception ex) {
         throw new HiveException(ex);
       }
@@ -946,8 +941,7 @@ public class VectorizationContext {
       String className = getFilterColumnScalarExpressionClassName(colType1,
           scalarType, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1,
             getScalarValue(constDesc));
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -963,8 +957,7 @@ public class VectorizationContext {
       String className = getFilterScalarColumnExpressionClassName(colType,
           scalarType, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol2,
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol2,
             getScalarValue(constDesc));
       } catch (Exception ex) {
         throw new HiveException(ex);
@@ -982,8 +975,7 @@ public class VectorizationContext {
       String className = getFilterColumnColumnExpressionClassName(colType1,
           colType2, opName);
       try {
-        expr = (VectorExpression) Class.forName(className).
-            getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+        expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
       } catch (Exception ex) {
         throw new HiveException(ex);
       }
@@ -998,6 +990,22 @@ public class VectorizationContext {
     return expr;
   }
 
+  private Constructor<?> getConstructor(String className) throws HiveException {
+    try {
+      Class<?> cl = Class.forName(className);
+      Constructor<?> [] ctors = cl.getDeclaredConstructors();
+      Constructor<?> defaultCtor = cl.getConstructor();
+      for (Constructor<?> ctor : ctors) {
+        if (!ctor.equals(defaultCtor)) {
+          return ctor;
+        }
+      }
+      throw new HiveException("Only default constructor found");
+    } catch (Exception ex) {
+      throw new HiveException(ex);
+    }
+  }
+
   private String getNormalizedTypeName(String colType) throws HiveException {
     validateInputType(colType);
     String normalizedType = null;
@@ -1244,31 +1252,6 @@ public class VectorizationContext {
     {"String",  BytesColumnVector.class},
   };
 
-  private VectorizedRowBatch allocateRowBatch(int rowCount) throws HiveException {
-    int columnCount = firstOutputColumnIndex + ocm.getNumOfOutputColumn();
-    VectorizedRowBatch ret = new VectorizedRowBatch(columnCount, rowCount);
-    for (int i=0; i < columnCount; ++i) {
-      String columnTypeName = ocm.getOutputColumnType(i);
-      for (Object[] columnType: columnTypes) {
-        if (columnTypeName.equalsIgnoreCase((String)columnType[0])) {
-          Class<? extends ColumnVector> columnTypeClass = (Class<? extends ColumnVector>)columnType[1];
-          try {
-            Constructor<? extends ColumnVector> ctor = columnTypeClass.getConstructor(int.class);
-            ret.cols[i] = ctor.newInstance(rowCount);
-          }
-          catch(Exception e) {
-            throw new HiveException (
-                String.format(
-                    "Internal exception occured trying to allocate a vectorized column %d of type %s",
-                    i, columnTypeName),
-                e);
-          }
-        }
-      }
-    }
-    return ret;
-  }
-
   public Map<Integer, String> getOutputColumnTypeMap() {
     Map<Integer, String> map = new HashMap<Integer, String>();
     for (int i = 0; i < ocm.outputColCount; i++) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Sep  3 18:33:13 2013
@@ -68,6 +68,8 @@ public class VectorizedRowBatchCtx {
   // list does not contain partition columns
   private List<Integer> colsToInclude;
 
+  private Map<Integer, String> columnTypeMap = null;
+
   /**
    * Constructor for VectorizedRowBatchCtx
    *
@@ -124,6 +126,11 @@ public class VectorizedRowBatchCtx {
             split.getPath(), IOPrepareCache.get().getPartitionDescMap());
     Class serdeclass = part.getDeserializerClass();
 
+    String partitionPath = split.getPath().getParent().toString();
+    columnTypeMap = Utilities
+        .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes()
+        .get(partitionPath);
+
     if (serdeclass == null) {
       String className = part.getSerdeClassName();
       if ((className == null) || (className.isEmpty())) {
@@ -253,6 +260,7 @@ public class VectorizedRowBatchCtx {
       }
     }
     result.numCols = fieldRefs.size();
+    this.addScratchColumnsToBatch(result);
     return result;
   }
 
@@ -330,4 +338,27 @@ public class VectorizedRowBatchCtx {
       }
     }
   }
+
+  private void addScratchColumnsToBatch(VectorizedRowBatch vrb) {
+    if (columnTypeMap != null && !columnTypeMap.isEmpty()) {
+      int origNumCols = vrb.numCols;
+      int newNumCols = vrb.cols.length+columnTypeMap.keySet().size();
+      vrb.cols = Arrays.copyOf(vrb.cols, newNumCols);
+      for (int i = origNumCols; i < newNumCols; i++) {
+        vrb.cols[i] = allocateColumnVector(columnTypeMap.get(i),
+            VectorizedRowBatch.DEFAULT_SIZE);
+      }
+      vrb.numCols = vrb.cols.length;
+    }
+  }
+
+  private ColumnVector allocateColumnVector(String type, int defaultSize) {
+    if (type.equalsIgnoreCase("double")) {
+      return new DoubleColumnVector(defaultSize);
+    } else if (type.equalsIgnoreCase("string")) {
+      return new BytesColumnVector(defaultSize);
+    } else {
+      return new LongColumnVector(defaultSize);
+    }
+  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java Tue Sep  3 18:33:13 2013
@@ -24,16 +24,22 @@ import org.apache.hadoop.hive.ql.exec.ve
  * This class represents an Or expression. This applies short circuit optimization.
  */
 public class FilterExprOrExpr extends VectorExpression {
-  private final int[] initialSelected = new int[VectorizedRowBatch.DEFAULT_SIZE];
-  private int[] unselected = new int[VectorizedRowBatch.DEFAULT_SIZE];
-  private final int[] tmp = new int[VectorizedRowBatch.DEFAULT_SIZE];
+  private static final long serialVersionUID = 1L;
+  private transient final int[] initialSelected = new int[VectorizedRowBatch.DEFAULT_SIZE];
+  private transient int[] unselected = new int[VectorizedRowBatch.DEFAULT_SIZE];
+  private transient final int[] tmp = new int[VectorizedRowBatch.DEFAULT_SIZE];
 
   public FilterExprOrExpr(VectorExpression childExpr1, VectorExpression childExpr2) {
+    this();
     this.childExpressions = new VectorExpression[2];
     childExpressions[0] = childExpr1;
     childExpressions[1] = childExpr2;
   }
 
+  public FilterExprOrExpr() {
+    super();
+  }
+
   @Override
   public void evaluate(VectorizedRowBatch batch) {
     int n = batch.size;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Sep  3 18:33:13 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
@@ -166,10 +167,8 @@ public class OrcInputFormat  extends Fil
   public RecordReader<NullWritable, OrcStruct>
       getRecordReader(InputSplit inputSplit, JobConf conf,
                       Reporter reporter) throws IOException {
-
-    boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
-        false);
-    if (vectorPath) {
+    if (Utilities
+        .getMapRedWork(conf).getMapWork().getVectorMode()) {
       RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
           reporter);
       return (RecordReader) vorr;
@@ -187,10 +186,9 @@ public class OrcInputFormat  extends Fil
   public boolean validateInput(FileSystem fs, HiveConf conf,
                                ArrayList<FileStatus> files
                               ) throws IOException {
-    boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
-        false);
 
-    if (vectorPath) {
+    if (Utilities
+        .getMapRedWork(conf).getMapWork().getVectorMode()) {
       return voif.validateInput(fs, conf, files);
     }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Tue Sep  3 18:33:13 2013
@@ -77,6 +77,13 @@ public class PhysicalOptimizer {
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT)) {
       resolvers.add(new BucketingSortingInferenceOptimizer());
     }
+
+    // Vectorization should be the last optimization, because it doesn't modify the plan
+    // or any operators. It makes a very low level transformation to the expressions to
+    // run in the vectorized mode.
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+      resolvers.add(new Vectorizer());
+    }
   }
 
   /**

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1519788&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Tue Sep  3 18:33:13 2013
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.UDFDayOfMonth;
+import org.apache.hadoop.hive.ql.udf.UDFHour;
+import org.apache.hadoop.hive.ql.udf.UDFLength;
+import org.apache.hadoop.hive.ql.udf.UDFLike;
+import org.apache.hadoop.hive.ql.udf.UDFLower;
+import org.apache.hadoop.hive.ql.udf.UDFMinute;
+import org.apache.hadoop.hive.ql.udf.UDFOPDivide;
+import org.apache.hadoop.hive.ql.udf.UDFOPMinus;
+import org.apache.hadoop.hive.ql.udf.UDFOPMod;
+import org.apache.hadoop.hive.ql.udf.UDFOPMultiply;
+import org.apache.hadoop.hive.ql.udf.UDFOPNegative;
+import org.apache.hadoop.hive.ql.udf.UDFOPPlus;
+import org.apache.hadoop.hive.ql.udf.UDFOPPositive;
+import org.apache.hadoop.hive.ql.udf.UDFSecond;
+import org.apache.hadoop.hive.ql.udf.UDFUpper;
+import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
+import org.apache.hadoop.hive.ql.udf.UDFYear;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+
+public class Vectorizer implements PhysicalPlanResolver {
+
+  protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class);
+
+  Set<String> supportedDataTypes = new HashSet<String>();
+  List<Task<? extends Serializable>> vectorizableTasks =
+      new ArrayList<Task<? extends Serializable>>();
+  Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
+
+  Set<String> supportedAggregationUdfs = new HashSet<String>();
+
+  private PhysicalContext physicalContext = null;;
+
+  public Vectorizer() {
+    supportedDataTypes.add("int");
+    supportedDataTypes.add("smallint");
+    supportedDataTypes.add("tinyint");
+    supportedDataTypes.add("bigint");
+    supportedDataTypes.add("integer");
+    supportedDataTypes.add("long");
+    supportedDataTypes.add("short");
+    supportedDataTypes.add("timestamp");
+    supportedDataTypes.add("boolean");
+    supportedDataTypes.add("string");
+    supportedDataTypes.add("byte");
+    supportedDataTypes.add("float");
+    supportedDataTypes.add("double");
+
+    supportedGenericUDFs.add(UDFOPNegative.class);
+    supportedGenericUDFs.add(UDFOPPositive.class);
+    supportedGenericUDFs.add(UDFOPPlus.class);
+    supportedGenericUDFs.add(UDFOPMinus.class);
+    supportedGenericUDFs.add(UDFOPMultiply.class);
+    supportedGenericUDFs.add(UDFOPDivide.class);
+    supportedGenericUDFs.add(UDFOPMod.class);
+
+    supportedGenericUDFs.add(GenericUDFOPEqualOrLessThan.class);
+    supportedGenericUDFs.add(GenericUDFOPEqualOrGreaterThan.class);
+    supportedGenericUDFs.add(GenericUDFOPGreaterThan.class);
+    supportedGenericUDFs.add(GenericUDFOPLessThan.class);
+    supportedGenericUDFs.add(GenericUDFOPNot.class);
+    supportedGenericUDFs.add(GenericUDFOPNotEqual.class);
+    supportedGenericUDFs.add(GenericUDFOPNotNull.class);
+    supportedGenericUDFs.add(GenericUDFOPNull.class);
+    supportedGenericUDFs.add(GenericUDFOPOr.class);
+    supportedGenericUDFs.add(GenericUDFOPAnd.class);
+    supportedGenericUDFs.add(GenericUDFOPEqual.class);
+    supportedGenericUDFs.add(GenericUDFToUnixTimeStamp.class);
+
+    supportedGenericUDFs.add(UDFHour.class);
+    supportedGenericUDFs.add(UDFLength.class);
+    supportedGenericUDFs.add(UDFMinute.class);
+    supportedGenericUDFs.add(UDFSecond.class);
+    supportedGenericUDFs.add(UDFYear.class);
+    supportedGenericUDFs.add(UDFWeekOfYear.class);
+    supportedGenericUDFs.add(UDFDayOfMonth.class);
+
+    supportedGenericUDFs.add(UDFLike.class);
+    supportedGenericUDFs.add(UDFLower.class);
+    supportedGenericUDFs.add(UDFUpper.class);
+
+    supportedAggregationUdfs.add("min");
+    supportedAggregationUdfs.add("max");
+    supportedAggregationUdfs.add("count");
+    supportedAggregationUdfs.add("sum");
+    supportedAggregationUdfs.add("avg");
+    supportedAggregationUdfs.add("variance");
+    supportedAggregationUdfs.add("var_pop");
+    supportedAggregationUdfs.add("var_samp");
+    supportedAggregationUdfs.add("std");
+    supportedAggregationUdfs.add("stddev");
+    supportedAggregationUdfs.add("stddev_pop");
+    supportedAggregationUdfs.add("stddev_samp");
+  }
+
+  class VectorizationDispatcher implements Dispatcher {
+
+    public VectorizationDispatcher(PhysicalContext pctx) {
+    }
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+        throws SemanticException {
+      Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+      if (currTask instanceof MapRedTask) {
+        boolean ret = validateMRTask((MapRedTask) currTask);
+        if (ret) {
+          vectorizeMRTask((MapRedTask) currTask);
+        }
+      }
+      return null;
+    }
+
+    private boolean validateMRTask(MapRedTask mrTask) throws SemanticException {
+      MapWork mapWork = mrTask.getWork().getMapWork();
+
+      // Validate the input format
+      for (String path : mapWork.getPathToPartitionInfo().keySet()) {
+        PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
+        List<Class<?>> interfaceList =
+            Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
+        if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
+          LOG.debug("Input format: " + pd.getInputFileFormatClassName()
+              + ", doesn't provide vectorized input");
+          System.err.println("Input format: " + pd.getInputFileFormatClassName()
+              + ", doesn't provide vectorized input");
+          return false;
+        }
+      }
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      ValidationNodeProcessor vnp = new ValidationNodeProcessor();
+      opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
+          + FileSinkOperator.getOperatorName()), vnp);
+      opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
+          + ReduceSinkOperator.getOperatorName()), vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+      // iterator the mapper operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(mapWork.getAliasToWork().values());
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+      for (Node n : nodeOutput.keySet()) {
+        if (nodeOutput.get(n) != null) {
+          if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private void vectorizeMRTask(MapRedTask mrTask) throws SemanticException {
+      System.err.println("Going down the vectorized path");
+      MapWork mapWork = mrTask.getWork().getMapWork();
+      mapWork.setVectorMode(true);
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      VectorizationNodeProcessor vnp = new VectorizationNodeProcessor(mrTask);
+      opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*" +
+          ReduceSinkOperator.getOperatorName()), vnp);
+      opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
+          + FileSinkOperator.getOperatorName()), vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new PreOrderWalker(disp);
+      // iterator the mapper operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(mapWork.getAliasToWork().values());
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+      mapWork.setScratchColumnVectorTypes(vnp.getScratchColumnVectorTypes());
+      return;
+    }
+  }
+
+  class ValidationNodeProcessor implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      for (Node n : stack) {
+        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+        if (op.getType().equals(OperatorType.REDUCESINK) &&
+            op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
+          return new Boolean(true);
+        }
+        boolean ret = validateOperator(op);
+        if (!ret) {
+          System.err.println("Operator: "+op.getName()+", could not be vectorized");
+          return new Boolean(false);
+        }
+      }
+      return new Boolean(true);
+    }
+  }
+
+  class VectorizationNodeProcessor implements NodeProcessor {
+
+    private final MapWork mWork;
+    private final Map<String, VectorizationContext> vectorizationContexts =
+        new HashMap<String, VectorizationContext>();
+
+    private final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
+        new HashMap<Operator<? extends OperatorDesc>, VectorizationContext>();
+
+    private final Set<Operator<? extends OperatorDesc>> opsDone =
+        new HashSet<Operator<? extends OperatorDesc>>();
+
+    public VectorizationNodeProcessor(MapRedTask mrTask) {
+      this.mWork = mrTask.getWork().getMapWork();
+    }
+
+    public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
+      Map<String, Map<Integer, String>> scratchColumnVectorTypes =
+          new HashMap<String, Map<Integer, String>>();
+      for (String onefile : vectorizationContexts.keySet()) {
+        VectorizationContext vc = vectorizationContexts.get(onefile);
+        Map<Integer, String> cmap = vc.getOutputColumnTypeMap();
+        scratchColumnVectorTypes.put(onefile, cmap);
+      }
+      return scratchColumnVectorTypes;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      Node firstOp = stack.firstElement();
+      TableScanOperator tsOp = null;
+
+      tsOp = (TableScanOperator) firstOp;
+
+      VectorizationContext vContext = vContextsByTSOp.get(tsOp);
+      if (vContext == null) {
+        String fileKey = "";
+        for (String onefile : mWork.getPathToAliases().keySet()) {
+          List<String> aliases = mWork.getPathToAliases().get(onefile);
+          for (String alias : aliases) {
+            Operator<? extends OperatorDesc> op = mWork.getAliasToWork().get(alias);
+            if (op == tsOp) {
+              fileKey = onefile;
+              break;
+            }
+          }
+        }
+        vContext = getVectorizationContext(tsOp, physicalContext);
+        vectorizationContexts.put(fileKey, vContext);
+        vContextsByTSOp.put(tsOp, vContext);
+      }
+
+      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+      if (op.getType().equals(OperatorType.REDUCESINK) &&
+          op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
+        // No need to vectorize
+        if (!opsDone.contains(op)) {
+          opsDone.add(op);
+        }
+      } else {
+        try {
+          if (!opsDone.contains(op)) {
+            Operator<? extends OperatorDesc> vectorOp =
+                vectorizeOperator(op, vContext);
+            opsDone.add(op);
+            if (vectorOp != op) {
+              opsDone.add(vectorOp);
+            }
+          }
+        } catch (HiveException e) {
+          throw new SemanticException(e);
+        }
+      }
+      return null;
+    }
+  }
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+    this.physicalContext  = pctx;
+    boolean vectorPath = HiveConf.getBoolVar(pctx.getConf(),
+        HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+    if (!vectorPath) {
+      LOG.info("Vectorization is disabled");
+      return pctx;
+    }
+    // create dispatcher and graph walker
+    Dispatcher disp = new VectorizationDispatcher(pctx);
+    TaskGraphWalker ogw = new TaskGraphWalker(disp);
+
+    // get all the tasks nodes from root task
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getRootTasks());
+
+    // begin to walk through the task tree.
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  private boolean validateOperator(Operator<? extends OperatorDesc> op) {
+    boolean ret = false;
+    switch (op.getType()) {
+      case GROUPBY:
+        ret = validateGroupByOperator((GroupByOperator) op);
+        break;
+      case FILTER:
+        ret = validateFilterOperator((FilterOperator) op);
+        break;
+      case SELECT:
+        ret = validateSelectOperator((SelectOperator) op);
+        break;
+      case REDUCESINK:
+        ret = validateReduceSinkOperator((ReduceSinkOperator) op);
+        break;
+      case FILESINK:
+      case TABLESCAN:
+        ret = true;
+        break;
+      default:
+        ret = false;
+        break;
+    }
+    return ret;
+  }
+
+  private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
+    List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
+    List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
+    List<ExprNodeDesc> valueDesc = op.getConf().getValueCols();
+    return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) &&
+        validateExprNodeDesc(valueDesc);
+  }
+
+  private boolean validateSelectOperator(SelectOperator op) {
+    List<ExprNodeDesc> descList = op.getConf().getColList();
+    for (ExprNodeDesc desc : descList) {
+      boolean ret = validateExprNodeDesc(desc);
+      if (!ret) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean validateFilterOperator(FilterOperator op) {
+    ExprNodeDesc desc = op.getConf().getPredicate();
+    return validateExprNodeDesc(desc);
+  }
+
+  private boolean validateGroupByOperator(GroupByOperator op) {
+    boolean ret = validateExprNodeDesc(op.getConf().getKeys());
+    if (!ret) {
+      return false;
+    }
+    return validateAggregationDesc(op.getConf().getAggregators());
+  }
+
+  private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
+    for (ExprNodeDesc d : descs) {
+      boolean ret = validateExprNodeDesc(d);
+      if (!ret) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean validateAggregationDesc(List<AggregationDesc> descs) {
+    for (AggregationDesc d : descs) {
+      boolean ret = validateAggregationDesc(d);
+      if (!ret) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean validateExprNodeDesc(ExprNodeDesc desc) {
+    boolean ret = validateDataType(desc.getTypeInfo().getTypeName());
+    if (!ret) {
+      return false;
+    }
+    if (desc instanceof ExprNodeGenericFuncDesc) {
+      ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
+      boolean r = validateGenericUdf(d.getGenericUDF());
+      if (!r) {
+        return false;
+      }
+    }
+    if (desc.getChildren() != null) {
+      for (ExprNodeDesc d: desc.getChildren()) {
+        validateExprNodeDesc(d);
+      }
+    }
+    return true;
+  }
+
+  private boolean validateGenericUdf(GenericUDF genericUDF) {
+    if (genericUDF instanceof GenericUDFBridge) {
+      Class<? extends UDF> udf = ((GenericUDFBridge) genericUDF).getUdfClass();
+      return supportedGenericUDFs.contains(udf);
+    } else {
+      return supportedGenericUDFs.contains(genericUDF.getClass());
+    }
+  }
+
+  private boolean validateAggregationDesc(AggregationDesc aggDesc) {
+    return supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase());
+  }
+
+  private boolean validateDataType(String type) {
+    return supportedDataTypes.contains(type.toLowerCase());
+  }
+
+  private VectorizationContext getVectorizationContext(Operator<? extends OperatorDesc> op,
+      PhysicalContext pctx) {
+    RowResolver rr = pctx.getParseContext().getOpParseCtx().get(op).getRowResolver();
+
+    Map<String, Integer> cmap = new HashMap<String, Integer>();
+    int columnCount = 0;
+    for (ColumnInfo c : rr.getColumnInfos()) {
+      if (!c.getIsVirtualCol()) {
+        cmap.put(c.getInternalName(), columnCount++);
+      }
+    }
+    return new VectorizationContext(cmap, columnCount);
+  }
+
+  private Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
+      VectorizationContext vContext) throws HiveException {
+    Operator<? extends OperatorDesc> vectorOp = null;
+
+    switch (op.getType()) {
+      case GROUPBY:
+      case FILTER:
+      case SELECT:
+      case FILESINK:
+      case REDUCESINK:
+        vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
+        break;
+      default:
+        vectorOp = op;
+        break;
+    }
+
+    if (vectorOp != op) {
+      if (op.getParentOperators() != null) {
+        vectorOp.setParentOperators(op.getParentOperators());
+        for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
+          p.replaceChild(op, vectorOp);
+        }
+      }
+      if (op.getChildOperators() != null) {
+        vectorOp.setChildOperators(op.getChildOperators());
+        for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
+          c.replaceParent(op, vectorOp);
+        }
+      }
+    }
+    return vectorOp;
+  }
+}

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Tue Sep  3 18:33:13 2013
@@ -52,7 +52,7 @@ import org.apache.hadoop.mapred.JobConf;
  * distributed on the cluster. The ExecMapper will ultimately deserialize this
  * class on the data nodes and setup it's operator pipeline accordingly.
  *
- * This class is also used in the explain command any property with the 
+ * This class is also used in the explain command any property with the
  * appropriate annotation will be displayed in the explain output.
  */
 @SuppressWarnings({"serial", "deprecation"})
@@ -112,6 +112,9 @@ public class MapWork extends BaseWork {
 
   private transient boolean useBucketizedHiveInputFormat;
 
+  private Map<String, Map<Integer, String>> scratchColumnVectorTypes = null;
+  private boolean vectorMode = false;
+
   public MapWork() {
   }
 
@@ -479,4 +482,21 @@ public class MapWork extends BaseWork {
       PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);
     }
   }
+
+  public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
+    return scratchColumnVectorTypes;
+  }
+
+  public void setScratchColumnVectorTypes(
+      Map<String, Map<Integer, String>> scratchColumnVectorTypes) {
+    this.scratchColumnVectorTypes = scratchColumnVectorTypes;
+  }
+
+  public boolean getVectorMode() {
+    return vectorMode;
+  }
+
+  public void setVectorMode(boolean vectorMode) {
+    this.vectorMode = vectorMode;
+  }
 }

Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java Tue Sep  3 18:33:13 2013
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import junit.framework.Assert;
 
 import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr;
@@ -25,6 +28,8 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.FilterLongColEqualDoubleScalar;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.FilterLongColGreaterLongColumn;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.junit.Test;
 
 /**
@@ -76,9 +81,19 @@ public class TestVectorFilterOperator {
     }
   }
 
+  private VectorFilterOperator getAVectorFilterOperator() throws HiveException {
+    ExprNodeColumnDesc col1Expr = new  ExprNodeColumnDesc(Long.class, "col1", "table", false);
+    Map<String, Integer> columnMap = new HashMap<String, Integer>();
+    columnMap.put("col1", 1);
+    VectorizationContext vc = new VectorizationContext(columnMap, 1);
+    FilterDesc fdesc = new FilterDesc();
+    fdesc.setPredicate(col1Expr);
+    return new VectorFilterOperator(vc, fdesc);
+  }
+
   @Test
   public void testBasicFilterOperator() throws HiveException {
-    VectorFilterOperator vfo = new VectorFilterOperator(null, null);
+    VectorFilterOperator vfo = getAVectorFilterOperator();
     VectorExpression ve1 = new FilterLongColGreaterLongColumn(0,1);
     VectorExpression ve2 = new FilterLongColEqualDoubleScalar(2, 0);
     VectorExpression ve3 = new FilterExprAndExpr(ve1,ve2);
@@ -105,7 +120,7 @@ public class TestVectorFilterOperator {
 
   @Test
   public void testBasicFilterLargeData() throws HiveException {
-    VectorFilterOperator vfo = new VectorFilterOperator(null, null);
+    VectorFilterOperator vfo = getAVectorFilterOperator();
     VectorExpression ve1 = new FilterLongColGreaterLongColumn(0,1);
     VectorExpression ve2 = new FilterLongColEqualDoubleScalar(2, 0);
     VectorExpression ve3 = new FilterExprAndExpr(ve1,ve2);