You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/03 20:33:14 UTC
svn commit: r1519788 [2/2] - in /hive/branches/vectorization/ql/src:
java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/exec/vector/
java/org/apache/hadoop/hive/ql/exec/vector/expressions/ java/o...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Tue Sep 3 18:33:13 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Ex
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterConstantBooleanVectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr;
import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprOrExpr;
import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterStringColLikeStringScalar;
@@ -157,7 +158,8 @@ public class VectorizationContext {
private final Set<Integer> usedOutputColumns = new HashSet<Integer>();
int allocateOutputColumn(String columnType) {
- return initialOutputCol + allocateOutputColumnInternal(columnType);
+ int relativeCol = allocateOutputColumnInternal(columnType);
+ return initialOutputCol + relativeCol;
}
private int allocateOutputColumnInternal(String columnType) {
@@ -192,14 +194,6 @@ public class VectorizationContext {
usedOutputColumns.remove(index-initialOutputCol);
}
}
-
- String getOutputColumnType(int index) {
- return outputColumnsTypes[index-initialOutputCol];
- }
-
- int getNumOfOutputColumn() {
- return outputColCount;
- }
}
public void setOperatorType(OperatorType opType) {
@@ -311,8 +305,22 @@ public class VectorizationContext {
return new ConstantVectorExpression(outCol, ((Number) exprDesc.getValue()).doubleValue());
} else if (type.equalsIgnoreCase("string")) {
return new ConstantVectorExpression(outCol, ((String) exprDesc.getValue()).getBytes());
+ } else if (type.equalsIgnoreCase("boolean")) {
+ if (this.opType == OperatorType.FILTER) {
+ if (((Boolean) exprDesc.getValue()).booleanValue()) {
+ return new FilterConstantBooleanVectorExpression(1);
+ } else {
+ return new FilterConstantBooleanVectorExpression(0);
+ }
+ } else {
+ if (((Boolean) exprDesc.getValue()).booleanValue()) {
+ return new ConstantVectorExpression(outCol, 1);
+ } else {
+ return new ConstantVectorExpression(outCol, 0);
+ }
+ }
} else {
- throw new HiveException("Unsupported constant type");
+ throw new HiveException("Unsupported constant type: "+type.toString());
}
}
@@ -339,8 +347,7 @@ public class VectorizationContext {
+ outputColumnType + "ColUnaryMinus";
VectorExpression expr;
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol, outputCol);
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol, outputCol);
} catch (Exception ex) {
throw new HiveException(ex);
}
@@ -470,14 +477,14 @@ public class VectorizationContext {
/* Return a unary string vector expression. This is used for functions like
* UPPER() and LOWER().
*/
- private VectorExpression getUnaryStringExpression(String vectorExprClassName,
+ private VectorExpression getUnaryStringExpression(String vectorExprClassName,
String resultType, // result type name
List<ExprNodeDesc> childExprList) throws HiveException {
-
+
/* Create an instance of the class vectorExprClassName for the input column or expression result
* and return it.
*/
-
+
ExprNodeDesc childExpr = childExprList.get(0);
int inputCol;
VectorExpression v1 = null;
@@ -497,8 +504,7 @@ public class VectorizationContext {
+ vectorExprClassName;
VectorExpression expr;
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol, outputCol);
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol, outputCol);
} catch (Exception ex) {
throw new HiveException(ex);
}
@@ -517,23 +523,23 @@ public class VectorizationContext {
VectorExpression expr = null;
int inputCol;
ExprNodeConstantDesc constDesc;
-
+
if ((leftExpr instanceof ExprNodeColumnDesc) &&
(rightExpr instanceof ExprNodeConstantDesc) ) {
ExprNodeColumnDesc leftColDesc = (ExprNodeColumnDesc) leftExpr;
constDesc = (ExprNodeConstantDesc) rightExpr;
inputCol = getInputColumnIndex(leftColDesc.getColumn());
- expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol,
- new Text((byte[]) getScalarValue(constDesc)));
+ expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol,
+ new Text((byte[]) getScalarValue(constDesc)));
} else if ((leftExpr instanceof ExprNodeGenericFuncDesc) &&
(rightExpr instanceof ExprNodeConstantDesc)) {
v1 = getVectorExpression(leftExpr);
inputCol = v1.getOutputColumn();
constDesc = (ExprNodeConstantDesc) rightExpr;
- expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol,
- new Text((byte[]) getScalarValue(constDesc)));
+ expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol,
+ new Text((byte[]) getScalarValue(constDesc)));
}
- // TODO add logic to handle cases where left input is an expression.
+ // TODO add logic to handle cases where left input is an expression.
if (expr == null) {
throw new HiveException("Vector LIKE filter expression could not be initialized");
}
@@ -558,8 +564,8 @@ public class VectorizationContext {
// org.apache.hadoop.hive.ql.exec.vector.expressions.VectorUDFYearLong
String vectorUDF = pkg + ".Vector"+udf+"Long";
try {
- VectorExpression v2 = (VectorExpression)Class.forName(vectorUDF).
- getDeclaredConstructors()[0].newInstance(inputCol,outputCol);
+ VectorExpression v2 = (VectorExpression) getConstructor(vectorUDF).
+ newInstance(inputCol,outputCol);
return v2;
} catch(Exception e) {
e.printStackTrace();
@@ -594,8 +600,7 @@ public class VectorizationContext {
int outputCol = ocm.allocateOutputColumn(getOutputColType(colType,
scalarType, method));
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol,
getScalarValue(constDesc), outputCol);
} catch (Exception ex) {
throw new HiveException(ex);
@@ -612,8 +617,7 @@ public class VectorizationContext {
String outputColType = getOutputColType(colType, scalarType, method);
int outputCol = ocm.allocateOutputColumn(outputColType);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc),
+ expr = (VectorExpression) getConstructor(className).newInstance(getScalarValue(constDesc),
inputCol, outputCol);
} catch (Exception ex) {
throw new HiveException("Could not instantiate: "+className, ex);
@@ -631,8 +635,7 @@ public class VectorizationContext {
colType2, method);
int outputCol = ocm.allocateOutputColumn(outputColType);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
outputCol);
} catch (Exception ex) {
throw new HiveException(ex);
@@ -650,8 +653,7 @@ public class VectorizationContext {
colType2, method);
int outputCol = ocm.allocateOutputColumn(outputColType);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
outputCol);
} catch (Exception ex) {
throw new HiveException((ex));
@@ -669,8 +671,7 @@ public class VectorizationContext {
String className = getBinaryColumnScalarExpressionClassName(colType1,
scalarType, method);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1,
getScalarValue(constDesc), outputCol);
} catch (Exception ex) {
throw new HiveException((ex));
@@ -689,8 +690,7 @@ public class VectorizationContext {
String className = getBinaryColumnColumnExpressionClassName(colType1,
colType2, method);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
outputCol);
} catch (Exception ex) {
throw new HiveException(ex);
@@ -708,8 +708,7 @@ public class VectorizationContext {
String className = getBinaryScalarColumnExpressionClassName(colType2,
scalarType, method);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc),
+ expr = (VectorExpression) getConstructor(className).newInstance(getScalarValue(constDesc),
inputCol2, outputCol);
} catch (Exception ex) {
throw new HiveException(ex);
@@ -730,8 +729,7 @@ public class VectorizationContext {
String className = getBinaryColumnColumnExpressionClassName(colType1,
colType2, method);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2,
outputCol);
} catch (Exception ex) {
throw new HiveException(ex);
@@ -864,13 +862,13 @@ public class VectorizationContext {
String className = getFilterColumnScalarExpressionClassName(colType,
scalarType, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol,
+ Constructor<?> ctor = getConstructor(className);
+ expr = (VectorExpression) ctor.newInstance(inputCol,
getScalarValue(constDesc));
} catch (Exception ex) {
throw new HiveException(ex);
}
- } else if ((leftExpr instanceof ExprNodeConstantDesc) &&
+ } else if ((leftExpr instanceof ExprNodeConstantDesc) &&
(rightExpr instanceof ExprNodeColumnDesc)) {
ExprNodeConstantDesc constDesc = (ExprNodeConstantDesc) leftExpr;
ExprNodeColumnDesc rightColDesc = (ExprNodeColumnDesc) rightExpr;
@@ -880,8 +878,8 @@ public class VectorizationContext {
String className = getFilterScalarColumnExpressionClassName(colType,
scalarType, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol,
+ //Constructor<?>
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol,
getScalarValue(constDesc));
} catch (Exception ex) {
throw new HiveException(ex);
@@ -897,8 +895,7 @@ public class VectorizationContext {
String className = getFilterColumnColumnExpressionClassName(colType1,
colType2, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
} catch (Exception ex) {
throw new HiveException(ex);
}
@@ -913,8 +910,7 @@ public class VectorizationContext {
String className = getFilterColumnColumnExpressionClassName(colType1,
colType2, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
} catch (Exception ex) {
throw new HiveException(ex);
}
@@ -930,8 +926,7 @@ public class VectorizationContext {
String className = getFilterColumnColumnExpressionClassName(colType1,
colType2, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
} catch (Exception ex) {
throw new HiveException(ex);
}
@@ -946,8 +941,7 @@ public class VectorizationContext {
String className = getFilterColumnScalarExpressionClassName(colType1,
scalarType, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1,
getScalarValue(constDesc));
} catch (Exception ex) {
throw new HiveException(ex);
@@ -963,8 +957,7 @@ public class VectorizationContext {
String className = getFilterScalarColumnExpressionClassName(colType,
scalarType, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol2,
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol2,
getScalarValue(constDesc));
} catch (Exception ex) {
throw new HiveException(ex);
@@ -982,8 +975,7 @@ public class VectorizationContext {
String className = getFilterColumnColumnExpressionClassName(colType1,
colType2, opName);
try {
- expr = (VectorExpression) Class.forName(className).
- getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2);
+ expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2);
} catch (Exception ex) {
throw new HiveException(ex);
}
@@ -998,6 +990,22 @@ public class VectorizationContext {
return expr;
}
+ private Constructor<?> getConstructor(String className) throws HiveException {
+ try {
+ Class<?> cl = Class.forName(className);
+ Constructor<?> [] ctors = cl.getDeclaredConstructors();
+ Constructor<?> defaultCtor = cl.getConstructor();
+ for (Constructor<?> ctor : ctors) {
+ if (!ctor.equals(defaultCtor)) {
+ return ctor;
+ }
+ }
+ throw new HiveException("Only default constructor found");
+ } catch (Exception ex) {
+ throw new HiveException(ex);
+ }
+ }
+
private String getNormalizedTypeName(String colType) throws HiveException {
validateInputType(colType);
String normalizedType = null;
@@ -1244,31 +1252,6 @@ public class VectorizationContext {
{"String", BytesColumnVector.class},
};
- private VectorizedRowBatch allocateRowBatch(int rowCount) throws HiveException {
- int columnCount = firstOutputColumnIndex + ocm.getNumOfOutputColumn();
- VectorizedRowBatch ret = new VectorizedRowBatch(columnCount, rowCount);
- for (int i=0; i < columnCount; ++i) {
- String columnTypeName = ocm.getOutputColumnType(i);
- for (Object[] columnType: columnTypes) {
- if (columnTypeName.equalsIgnoreCase((String)columnType[0])) {
- Class<? extends ColumnVector> columnTypeClass = (Class<? extends ColumnVector>)columnType[1];
- try {
- Constructor<? extends ColumnVector> ctor = columnTypeClass.getConstructor(int.class);
- ret.cols[i] = ctor.newInstance(rowCount);
- }
- catch(Exception e) {
- throw new HiveException (
- String.format(
- "Internal exception occured trying to allocate a vectorized column %d of type %s",
- i, columnTypeName),
- e);
- }
- }
- }
- }
- return ret;
- }
-
public Map<Integer, String> getOutputColumnTypeMap() {
Map<Integer, String> map = new HashMap<Integer, String>();
for (int i = 0; i < ocm.outputColCount; i++) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Sep 3 18:33:13 2013
@@ -68,6 +68,8 @@ public class VectorizedRowBatchCtx {
// list does not contain partition columns
private List<Integer> colsToInclude;
+ private Map<Integer, String> columnTypeMap = null;
+
/**
* Constructor for VectorizedRowBatchCtx
*
@@ -124,6 +126,11 @@ public class VectorizedRowBatchCtx {
split.getPath(), IOPrepareCache.get().getPartitionDescMap());
Class serdeclass = part.getDeserializerClass();
+ String partitionPath = split.getPath().getParent().toString();
+ columnTypeMap = Utilities
+ .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes()
+ .get(partitionPath);
+
if (serdeclass == null) {
String className = part.getSerdeClassName();
if ((className == null) || (className.isEmpty())) {
@@ -253,6 +260,7 @@ public class VectorizedRowBatchCtx {
}
}
result.numCols = fieldRefs.size();
+ this.addScratchColumnsToBatch(result);
return result;
}
@@ -330,4 +338,27 @@ public class VectorizedRowBatchCtx {
}
}
}
+
+ private void addScratchColumnsToBatch(VectorizedRowBatch vrb) {
+ if (columnTypeMap != null && !columnTypeMap.isEmpty()) {
+ int origNumCols = vrb.numCols;
+ int newNumCols = vrb.cols.length+columnTypeMap.keySet().size();
+ vrb.cols = Arrays.copyOf(vrb.cols, newNumCols);
+ for (int i = origNumCols; i < newNumCols; i++) {
+ vrb.cols[i] = allocateColumnVector(columnTypeMap.get(i),
+ VectorizedRowBatch.DEFAULT_SIZE);
+ }
+ vrb.numCols = vrb.cols.length;
+ }
+ }
+
+ private ColumnVector allocateColumnVector(String type, int defaultSize) {
+ if (type.equalsIgnoreCase("double")) {
+ return new DoubleColumnVector(defaultSize);
+ } else if (type.equalsIgnoreCase("string")) {
+ return new BytesColumnVector(defaultSize);
+ } else {
+ return new LongColumnVector(defaultSize);
+ }
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java Tue Sep 3 18:33:13 2013
@@ -24,16 +24,22 @@ import org.apache.hadoop.hive.ql.exec.ve
* This class represents an Or expression. This applies short circuit optimization.
*/
public class FilterExprOrExpr extends VectorExpression {
- private final int[] initialSelected = new int[VectorizedRowBatch.DEFAULT_SIZE];
- private int[] unselected = new int[VectorizedRowBatch.DEFAULT_SIZE];
- private final int[] tmp = new int[VectorizedRowBatch.DEFAULT_SIZE];
+ private static final long serialVersionUID = 1L;
+ private transient final int[] initialSelected = new int[VectorizedRowBatch.DEFAULT_SIZE];
+ private transient int[] unselected = new int[VectorizedRowBatch.DEFAULT_SIZE];
+ private transient final int[] tmp = new int[VectorizedRowBatch.DEFAULT_SIZE];
public FilterExprOrExpr(VectorExpression childExpr1, VectorExpression childExpr2) {
+ this();
this.childExpressions = new VectorExpression[2];
childExpressions[0] = childExpr1;
childExpressions[1] = childExpr2;
}
+ public FilterExprOrExpr() {
+ super();
+ }
+
@Override
public void evaluate(VectorizedRowBatch batch) {
int n = batch.size;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Sep 3 18:33:13 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
@@ -166,10 +167,8 @@ public class OrcInputFormat extends Fil
public RecordReader<NullWritable, OrcStruct>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
-
- boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
- false);
- if (vectorPath) {
+ if (Utilities
+ .getMapRedWork(conf).getMapWork().getVectorMode()) {
RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
reporter);
return (RecordReader) vorr;
@@ -187,10 +186,9 @@ public class OrcInputFormat extends Fil
public boolean validateInput(FileSystem fs, HiveConf conf,
ArrayList<FileStatus> files
) throws IOException {
- boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
- false);
- if (vectorPath) {
+ if (Utilities
+ .getMapRedWork(conf).getMapWork().getVectorMode()) {
return voif.validateInput(fs, conf, files);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Tue Sep 3 18:33:13 2013
@@ -77,6 +77,13 @@ public class PhysicalOptimizer {
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT)) {
resolvers.add(new BucketingSortingInferenceOptimizer());
}
+
+ // Vectorization should be the last optimization, because it doesn't modify the plan
+ // or any operators. It makes a very low level transformation to the expressions to
+ // run in the vectorized mode.
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+ resolvers.add(new Vectorizer());
+ }
}
/**
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1519788&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Tue Sep 3 18:33:13 2013
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.UDFDayOfMonth;
+import org.apache.hadoop.hive.ql.udf.UDFHour;
+import org.apache.hadoop.hive.ql.udf.UDFLength;
+import org.apache.hadoop.hive.ql.udf.UDFLike;
+import org.apache.hadoop.hive.ql.udf.UDFLower;
+import org.apache.hadoop.hive.ql.udf.UDFMinute;
+import org.apache.hadoop.hive.ql.udf.UDFOPDivide;
+import org.apache.hadoop.hive.ql.udf.UDFOPMinus;
+import org.apache.hadoop.hive.ql.udf.UDFOPMod;
+import org.apache.hadoop.hive.ql.udf.UDFOPMultiply;
+import org.apache.hadoop.hive.ql.udf.UDFOPNegative;
+import org.apache.hadoop.hive.ql.udf.UDFOPPlus;
+import org.apache.hadoop.hive.ql.udf.UDFOPPositive;
+import org.apache.hadoop.hive.ql.udf.UDFSecond;
+import org.apache.hadoop.hive.ql.udf.UDFUpper;
+import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
+import org.apache.hadoop.hive.ql.udf.UDFYear;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+
+public class Vectorizer implements PhysicalPlanResolver {
+
+ protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class);
+
+ Set<String> supportedDataTypes = new HashSet<String>();
+ List<Task<? extends Serializable>> vectorizableTasks =
+ new ArrayList<Task<? extends Serializable>>();
+ Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
+
+ Set<String> supportedAggregationUdfs = new HashSet<String>();
+
+ private PhysicalContext physicalContext = null;;
+
+ public Vectorizer() {
+ supportedDataTypes.add("int");
+ supportedDataTypes.add("smallint");
+ supportedDataTypes.add("tinyint");
+ supportedDataTypes.add("bigint");
+ supportedDataTypes.add("integer");
+ supportedDataTypes.add("long");
+ supportedDataTypes.add("short");
+ supportedDataTypes.add("timestamp");
+ supportedDataTypes.add("boolean");
+ supportedDataTypes.add("string");
+ supportedDataTypes.add("byte");
+ supportedDataTypes.add("float");
+ supportedDataTypes.add("double");
+
+ supportedGenericUDFs.add(UDFOPNegative.class);
+ supportedGenericUDFs.add(UDFOPPositive.class);
+ supportedGenericUDFs.add(UDFOPPlus.class);
+ supportedGenericUDFs.add(UDFOPMinus.class);
+ supportedGenericUDFs.add(UDFOPMultiply.class);
+ supportedGenericUDFs.add(UDFOPDivide.class);
+ supportedGenericUDFs.add(UDFOPMod.class);
+
+ supportedGenericUDFs.add(GenericUDFOPEqualOrLessThan.class);
+ supportedGenericUDFs.add(GenericUDFOPEqualOrGreaterThan.class);
+ supportedGenericUDFs.add(GenericUDFOPGreaterThan.class);
+ supportedGenericUDFs.add(GenericUDFOPLessThan.class);
+ supportedGenericUDFs.add(GenericUDFOPNot.class);
+ supportedGenericUDFs.add(GenericUDFOPNotEqual.class);
+ supportedGenericUDFs.add(GenericUDFOPNotNull.class);
+ supportedGenericUDFs.add(GenericUDFOPNull.class);
+ supportedGenericUDFs.add(GenericUDFOPOr.class);
+ supportedGenericUDFs.add(GenericUDFOPAnd.class);
+ supportedGenericUDFs.add(GenericUDFOPEqual.class);
+ supportedGenericUDFs.add(GenericUDFToUnixTimeStamp.class);
+
+ supportedGenericUDFs.add(UDFHour.class);
+ supportedGenericUDFs.add(UDFLength.class);
+ supportedGenericUDFs.add(UDFMinute.class);
+ supportedGenericUDFs.add(UDFSecond.class);
+ supportedGenericUDFs.add(UDFYear.class);
+ supportedGenericUDFs.add(UDFWeekOfYear.class);
+ supportedGenericUDFs.add(UDFDayOfMonth.class);
+
+ supportedGenericUDFs.add(UDFLike.class);
+ supportedGenericUDFs.add(UDFLower.class);
+ supportedGenericUDFs.add(UDFUpper.class);
+
+ supportedAggregationUdfs.add("min");
+ supportedAggregationUdfs.add("max");
+ supportedAggregationUdfs.add("count");
+ supportedAggregationUdfs.add("sum");
+ supportedAggregationUdfs.add("avg");
+ supportedAggregationUdfs.add("variance");
+ supportedAggregationUdfs.add("var_pop");
+ supportedAggregationUdfs.add("var_samp");
+ supportedAggregationUdfs.add("std");
+ supportedAggregationUdfs.add("stddev");
+ supportedAggregationUdfs.add("stddev_pop");
+ supportedAggregationUdfs.add("stddev_samp");
+ }
+
+ class VectorizationDispatcher implements Dispatcher {
+
+ public VectorizationDispatcher(PhysicalContext pctx) {
+ }
+
+ @Override
+ public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+ throws SemanticException {
+ Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+ if (currTask instanceof MapRedTask) {
+ boolean ret = validateMRTask((MapRedTask) currTask);
+ if (ret) {
+ vectorizeMRTask((MapRedTask) currTask);
+ }
+ }
+ return null;
+ }
+
+ private boolean validateMRTask(MapRedTask mrTask) throws SemanticException {
+ MapWork mapWork = mrTask.getWork().getMapWork();
+
+ // Validate the input format
+ for (String path : mapWork.getPathToPartitionInfo().keySet()) {
+ PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
+ List<Class<?>> interfaceList =
+ Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
+ if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
+ LOG.debug("Input format: " + pd.getInputFileFormatClassName()
+ + ", doesn't provide vectorized input");
+ System.err.println("Input format: " + pd.getInputFileFormatClassName()
+ + ", doesn't provide vectorized input");
+ return false;
+ }
+ }
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ ValidationNodeProcessor vnp = new ValidationNodeProcessor();
+ opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
+ + FileSinkOperator.getOperatorName()), vnp);
+ opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
+ + ReduceSinkOperator.getOperatorName()), vnp);
+ Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ // iterator the mapper operator tree
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(mapWork.getAliasToWork().values());
+ HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+ ogw.startWalking(topNodes, nodeOutput);
+ for (Node n : nodeOutput.keySet()) {
+ if (nodeOutput.get(n) != null) {
+ if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void vectorizeMRTask(MapRedTask mrTask) throws SemanticException {
+ System.err.println("Going down the vectorized path");
+ MapWork mapWork = mrTask.getWork().getMapWork();
+ mapWork.setVectorMode(true);
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ VectorizationNodeProcessor vnp = new VectorizationNodeProcessor(mrTask);
+ opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*" +
+ ReduceSinkOperator.getOperatorName()), vnp);
+ opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
+ + FileSinkOperator.getOperatorName()), vnp);
+ Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+ GraphWalker ogw = new PreOrderWalker(disp);
+ // iterator the mapper operator tree
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(mapWork.getAliasToWork().values());
+ HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+ ogw.startWalking(topNodes, nodeOutput);
+ mapWork.setScratchColumnVectorTypes(vnp.getScratchColumnVectorTypes());
+ return;
+ }
+ }
+
+ class ValidationNodeProcessor implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ for (Node n : stack) {
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+ if (op.getType().equals(OperatorType.REDUCESINK) &&
+ op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
+ return new Boolean(true);
+ }
+ boolean ret = validateOperator(op);
+ if (!ret) {
+ System.err.println("Operator: "+op.getName()+", could not be vectorized");
+ return new Boolean(false);
+ }
+ }
+ return new Boolean(true);
+ }
+ }
+
+ class VectorizationNodeProcessor implements NodeProcessor {
+
+ private final MapWork mWork;
+ private final Map<String, VectorizationContext> vectorizationContexts =
+ new HashMap<String, VectorizationContext>();
+
+ private final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
+ new HashMap<Operator<? extends OperatorDesc>, VectorizationContext>();
+
+ private final Set<Operator<? extends OperatorDesc>> opsDone =
+ new HashSet<Operator<? extends OperatorDesc>>();
+
+ public VectorizationNodeProcessor(MapRedTask mrTask) {
+ this.mWork = mrTask.getWork().getMapWork();
+ }
+
+ public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
+ Map<String, Map<Integer, String>> scratchColumnVectorTypes =
+ new HashMap<String, Map<Integer, String>>();
+ for (String onefile : vectorizationContexts.keySet()) {
+ VectorizationContext vc = vectorizationContexts.get(onefile);
+ Map<Integer, String> cmap = vc.getOutputColumnTypeMap();
+ scratchColumnVectorTypes.put(onefile, cmap);
+ }
+ return scratchColumnVectorTypes;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ Node firstOp = stack.firstElement();
+ TableScanOperator tsOp = null;
+
+ tsOp = (TableScanOperator) firstOp;
+
+ VectorizationContext vContext = vContextsByTSOp.get(tsOp);
+ if (vContext == null) {
+ String fileKey = "";
+ for (String onefile : mWork.getPathToAliases().keySet()) {
+ List<String> aliases = mWork.getPathToAliases().get(onefile);
+ for (String alias : aliases) {
+ Operator<? extends OperatorDesc> op = mWork.getAliasToWork().get(alias);
+ if (op == tsOp) {
+ fileKey = onefile;
+ break;
+ }
+ }
+ }
+ vContext = getVectorizationContext(tsOp, physicalContext);
+ vectorizationContexts.put(fileKey, vContext);
+ vContextsByTSOp.put(tsOp, vContext);
+ }
+
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ if (op.getType().equals(OperatorType.REDUCESINK) &&
+ op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
+ // No need to vectorize
+ if (!opsDone.contains(op)) {
+ opsDone.add(op);
+ }
+ } else {
+ try {
+ if (!opsDone.contains(op)) {
+ Operator<? extends OperatorDesc> vectorOp =
+ vectorizeOperator(op, vContext);
+ opsDone.add(op);
+ if (vectorOp != op) {
+ opsDone.add(vectorOp);
+ }
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+ this.physicalContext = pctx;
+ boolean vectorPath = HiveConf.getBoolVar(pctx.getConf(),
+ HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+ if (!vectorPath) {
+ LOG.info("Vectorization is disabled");
+ return pctx;
+ }
+ // create dispatcher and graph walker
+ Dispatcher disp = new VectorizationDispatcher(pctx);
+ TaskGraphWalker ogw = new TaskGraphWalker(disp);
+
+ // get all the tasks nodes from root task
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getRootTasks());
+
+ // begin to walk through the task tree.
+ ogw.startWalking(topNodes, null);
+ return pctx;
+ }
+
+ private boolean validateOperator(Operator<? extends OperatorDesc> op) {
+ boolean ret = false;
+ switch (op.getType()) {
+ case GROUPBY:
+ ret = validateGroupByOperator((GroupByOperator) op);
+ break;
+ case FILTER:
+ ret = validateFilterOperator((FilterOperator) op);
+ break;
+ case SELECT:
+ ret = validateSelectOperator((SelectOperator) op);
+ break;
+ case REDUCESINK:
+ ret = validateReduceSinkOperator((ReduceSinkOperator) op);
+ break;
+ case FILESINK:
+ case TABLESCAN:
+ ret = true;
+ break;
+ default:
+ ret = false;
+ break;
+ }
+ return ret;
+ }
+
+ private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
+ List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
+ List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
+ List<ExprNodeDesc> valueDesc = op.getConf().getValueCols();
+ return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) &&
+ validateExprNodeDesc(valueDesc);
+ }
+
+ private boolean validateSelectOperator(SelectOperator op) {
+ List<ExprNodeDesc> descList = op.getConf().getColList();
+ for (ExprNodeDesc desc : descList) {
+ boolean ret = validateExprNodeDesc(desc);
+ if (!ret) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean validateFilterOperator(FilterOperator op) {
+ ExprNodeDesc desc = op.getConf().getPredicate();
+ return validateExprNodeDesc(desc);
+ }
+
+ private boolean validateGroupByOperator(GroupByOperator op) {
+ boolean ret = validateExprNodeDesc(op.getConf().getKeys());
+ if (!ret) {
+ return false;
+ }
+ return validateAggregationDesc(op.getConf().getAggregators());
+ }
+
+ private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
+ for (ExprNodeDesc d : descs) {
+ boolean ret = validateExprNodeDesc(d);
+ if (!ret) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean validateAggregationDesc(List<AggregationDesc> descs) {
+ for (AggregationDesc d : descs) {
+ boolean ret = validateAggregationDesc(d);
+ if (!ret) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean validateExprNodeDesc(ExprNodeDesc desc) {
+ boolean ret = validateDataType(desc.getTypeInfo().getTypeName());
+ if (!ret) {
+ return false;
+ }
+ if (desc instanceof ExprNodeGenericFuncDesc) {
+ ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
+ boolean r = validateGenericUdf(d.getGenericUDF());
+ if (!r) {
+ return false;
+ }
+ }
+ if (desc.getChildren() != null) {
+ for (ExprNodeDesc d: desc.getChildren()) {
+ validateExprNodeDesc(d);
+ }
+ }
+ return true;
+ }
+
+ private boolean validateGenericUdf(GenericUDF genericUDF) {
+ if (genericUDF instanceof GenericUDFBridge) {
+ Class<? extends UDF> udf = ((GenericUDFBridge) genericUDF).getUdfClass();
+ return supportedGenericUDFs.contains(udf);
+ } else {
+ return supportedGenericUDFs.contains(genericUDF.getClass());
+ }
+ }
+
+ private boolean validateAggregationDesc(AggregationDesc aggDesc) {
+ return supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase());
+ }
+
+ private boolean validateDataType(String type) {
+ return supportedDataTypes.contains(type.toLowerCase());
+ }
+
+ private VectorizationContext getVectorizationContext(Operator<? extends OperatorDesc> op,
+ PhysicalContext pctx) {
+ RowResolver rr = pctx.getParseContext().getOpParseCtx().get(op).getRowResolver();
+
+ Map<String, Integer> cmap = new HashMap<String, Integer>();
+ int columnCount = 0;
+ for (ColumnInfo c : rr.getColumnInfos()) {
+ if (!c.getIsVirtualCol()) {
+ cmap.put(c.getInternalName(), columnCount++);
+ }
+ }
+ return new VectorizationContext(cmap, columnCount);
+ }
+
+ private Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
+ VectorizationContext vContext) throws HiveException {
+ Operator<? extends OperatorDesc> vectorOp = null;
+
+ switch (op.getType()) {
+ case GROUPBY:
+ case FILTER:
+ case SELECT:
+ case FILESINK:
+ case REDUCESINK:
+ vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
+ break;
+ default:
+ vectorOp = op;
+ break;
+ }
+
+ if (vectorOp != op) {
+ if (op.getParentOperators() != null) {
+ vectorOp.setParentOperators(op.getParentOperators());
+ for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
+ p.replaceChild(op, vectorOp);
+ }
+ }
+ if (op.getChildOperators() != null) {
+ vectorOp.setChildOperators(op.getChildOperators());
+ for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
+ c.replaceParent(op, vectorOp);
+ }
+ }
+ }
+ return vectorOp;
+ }
+}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Tue Sep 3 18:33:13 2013
@@ -52,7 +52,7 @@ import org.apache.hadoop.mapred.JobConf;
* distributed on the cluster. The ExecMapper will ultimately deserialize this
* class on the data nodes and setup it's operator pipeline accordingly.
*
- * This class is also used in the explain command any property with the
+ * This class is also used in the explain command any property with the
* appropriate annotation will be displayed in the explain output.
*/
@SuppressWarnings({"serial", "deprecation"})
@@ -112,6 +112,9 @@ public class MapWork extends BaseWork {
private transient boolean useBucketizedHiveInputFormat;
+ private Map<String, Map<Integer, String>> scratchColumnVectorTypes = null;
+ private boolean vectorMode = false;
+
public MapWork() {
}
@@ -479,4 +482,21 @@ public class MapWork extends BaseWork {
PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);
}
}
+
+ public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
+ return scratchColumnVectorTypes;
+ }
+
+ public void setScratchColumnVectorTypes(
+ Map<String, Map<Integer, String>> scratchColumnVectorTypes) {
+ this.scratchColumnVectorTypes = scratchColumnVectorTypes;
+ }
+
+ public boolean getVectorMode() {
+ return vectorMode;
+ }
+
+ public void setVectorMode(boolean vectorMode) {
+ this.vectorMode = vectorMode;
+ }
}
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java Tue Sep 3 18:33:13 2013
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.util.HashMap;
+import java.util.Map;
+
import junit.framework.Assert;
import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr;
@@ -25,6 +28,8 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.FilterLongColEqualDoubleScalar;
import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.FilterLongColGreaterLongColumn;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.junit.Test;
/**
@@ -76,9 +81,19 @@ public class TestVectorFilterOperator {
}
}
+ private VectorFilterOperator getAVectorFilterOperator() throws HiveException {
+ ExprNodeColumnDesc col1Expr = new ExprNodeColumnDesc(Long.class, "col1", "table", false);
+ Map<String, Integer> columnMap = new HashMap<String, Integer>();
+ columnMap.put("col1", 1);
+ VectorizationContext vc = new VectorizationContext(columnMap, 1);
+ FilterDesc fdesc = new FilterDesc();
+ fdesc.setPredicate(col1Expr);
+ return new VectorFilterOperator(vc, fdesc);
+ }
+
@Test
public void testBasicFilterOperator() throws HiveException {
- VectorFilterOperator vfo = new VectorFilterOperator(null, null);
+ VectorFilterOperator vfo = getAVectorFilterOperator();
VectorExpression ve1 = new FilterLongColGreaterLongColumn(0,1);
VectorExpression ve2 = new FilterLongColEqualDoubleScalar(2, 0);
VectorExpression ve3 = new FilterExprAndExpr(ve1,ve2);
@@ -105,7 +120,7 @@ public class TestVectorFilterOperator {
@Test
public void testBasicFilterLargeData() throws HiveException {
- VectorFilterOperator vfo = new VectorFilterOperator(null, null);
+ VectorFilterOperator vfo = getAVectorFilterOperator();
VectorExpression ve1 = new FilterLongColGreaterLongColumn(0,1);
VectorExpression ve2 = new FilterLongColEqualDoubleScalar(2, 0);
VectorExpression ve3 = new FilterExprAndExpr(ve1,ve2);