You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2015/09/23 08:25:03 UTC
[3/3] hive git commit: HIVE-11468: Vectorize Struct IN() clauses
(Matt McCline, via Gopal V)
HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7cfe3743
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7cfe3743
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7cfe3743
Branch: refs/heads/master
Commit: 7cfe3743ff583386653bdd32c79f2c44ffe734ba
Parents: 2e8324e
Author: Gopal V <go...@apache.org>
Authored: Tue Sep 22 19:39:49 2015 -0700
Committer: Gopal V <go...@apache.org>
Committed: Tue Sep 22 23:24:14 2015 -0700
----------------------------------------------------------------------
.../ql/exec/vector/VectorizationContext.java | 203 +-
.../expressions/FilterStringColumnInList.java | 13 +-
.../expressions/FilterStructColumnInList.java | 178 ++
.../exec/vector/expressions/IStructInExpr.java | 36 +
.../vector/expressions/StringColumnInList.java | 4 +
.../vector/expressions/StructColumnInList.java | 174 ++
.../hive/ql/optimizer/physical/Vectorizer.java | 71 +-
.../ql/optimizer/physical/Vectorizer.java.orig | 1744 ++++++++++++++++++
.../ql/optimizer/physical/Vectorizer.java.rej | 86 +
.../queries/clientpositive/vector_struct_in.q | 247 +++
.../clientpositive/vector_struct_in.q.out | 825 +++++++++
11 files changed, 3566 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 2483196..46c2a78 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -104,20 +104,30 @@ import org.apache.hadoop.hive.ql.udf.UDFToLong;
import org.apache.hadoop.hive.ql.udf.UDFToShort;
import org.apache.hadoop.hive.ql.udf.UDFToString;
import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.DateUtils;
+
/**
* Context class for vectorization execution.
* Main role is to map column names to column indices and serves as a
@@ -1273,17 +1283,208 @@ public class VectorizationContext {
}
}
+ public enum InConstantType {
+ INT_FAMILY,
+ TIMESTAMP,
+ DATE,
+ FLOAT_FAMILY,
+ STRING_FAMILY,
+ DECIMAL
+ }
+
+ public static InConstantType getInConstantTypeFromPrimitiveCategory(PrimitiveCategory primitiveCategory) {
+
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return InConstantType.INT_FAMILY;
+
+ case DATE:
+ return InConstantType.TIMESTAMP;
+
+ case TIMESTAMP:
+ return InConstantType.DATE;
+
+ case FLOAT:
+ case DOUBLE:
+ return InConstantType.FLOAT_FAMILY;
+
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ return InConstantType.STRING_FAMILY;
+
+ case DECIMAL:
+ return InConstantType.DECIMAL;
+
+
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ // UNDONE: Fall through for these... they don't appear to be supported yet.
+ default:
+ throw new RuntimeException("Unexpected primitive type category " + primitiveCategory);
+ }
+ }
+
+ private VectorExpression getStructInExpression(List<ExprNodeDesc> childExpr, ExprNodeDesc colExpr,
+ TypeInfo colTypeInfo, List<ExprNodeDesc> inChildren, Mode mode, TypeInfo returnType)
+ throws HiveException {
+
+ VectorExpression expr = null;
+
+ StructTypeInfo structTypeInfo = (StructTypeInfo) colTypeInfo;
+
+ ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ final int fieldCount = fieldTypeInfos.size();
+ ColumnVector.Type[] fieldVectorColumnTypes = new ColumnVector.Type[fieldCount];
+ InConstantType[] fieldInConstantTypes = new InConstantType[fieldCount];
+ for (int f = 0; f < fieldCount; f++) {
+ TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
+ // Only primitive fields supports for now.
+ if (fieldTypeInfo.getCategory() != Category.PRIMITIVE) {
+ return null;
+ }
+
+ // We are going to serialize using the 4 basic types.
+ ColumnVector.Type fieldVectorColumnType = getColumnVectorTypeFromTypeInfo(fieldTypeInfo);
+ fieldVectorColumnTypes[f] = fieldVectorColumnType;
+
+ // We currently evaluate the IN (..) constants in special ways.
+ PrimitiveCategory fieldPrimitiveCategory =
+ ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory();
+ InConstantType inConstantType = getInConstantTypeFromPrimitiveCategory(fieldPrimitiveCategory);
+ fieldInConstantTypes[f] = inConstantType;
+ }
+
+ Output buffer = new Output();
+ BinarySortableSerializeWrite binarySortableSerializeWrite =
+ new BinarySortableSerializeWrite(fieldCount);
+
+ final int inChildrenCount = inChildren.size();
+ byte[][] serializedInChildren = new byte[inChildrenCount][];
+ try {
+ for (int i = 0; i < inChildrenCount; i++) {
+ final ExprNodeDesc node = inChildren.get(i);
+ final Object[] constants;
+
+ if (node instanceof ExprNodeConstantDesc) {
+ ExprNodeConstantDesc constNode = (ExprNodeConstantDesc) node;
+ ConstantObjectInspector output = constNode.getWritableObjectInspector();
+ constants = ((List<?>) output.getWritableConstantValue()).toArray();
+ } else {
+ ExprNodeGenericFuncDesc exprNode = (ExprNodeGenericFuncDesc) node;
+ ExprNodeEvaluator<?> evaluator = ExprNodeEvaluatorFactory
+ .get(exprNode);
+ ObjectInspector output = evaluator.initialize(exprNode
+ .getWritableObjectInspector());
+ constants = (Object[]) evaluator.evaluate(null);
+ }
+
+ binarySortableSerializeWrite.set(buffer);
+ for (int f = 0; f < fieldCount; f++) {
+ Object constant = constants[f];
+ if (constant == null) {
+ binarySortableSerializeWrite.writeNull();
+ } else {
+ InConstantType inConstantType = fieldInConstantTypes[f];
+ switch (inConstantType) {
+ case STRING_FAMILY:
+ {
+ byte[] bytes;
+ if (constant instanceof Text) {
+ Text text = (Text) constant;
+ bytes = text.getBytes();
+ binarySortableSerializeWrite.writeString(bytes, 0, text.getLength());
+ } else {
+ throw new HiveException("Unexpected constant String type " +
+ constant.getClass().getSimpleName());
+ }
+ }
+ break;
+ case INT_FAMILY:
+ {
+ long value;
+ if (constant instanceof IntWritable) {
+ value = ((IntWritable) constant).get();
+ } else if (constant instanceof LongWritable) {
+ value = ((LongWritable) constant).get();
+ } else {
+ throw new HiveException("Unexpected constant Long type " +
+ constant.getClass().getSimpleName());
+ }
+ binarySortableSerializeWrite.writeLong(value);
+ }
+ break;
+
+ case FLOAT_FAMILY:
+ {
+ double value;
+ if (constant instanceof DoubleWritable) {
+ value = ((DoubleWritable) constant).get();
+ } else {
+ throw new HiveException("Unexpected constant Double type " +
+ constant.getClass().getSimpleName());
+ }
+ binarySortableSerializeWrite.writeDouble(value);
+ }
+ break;
+
+ // UNDONE...
+ case DATE:
+ case TIMESTAMP:
+ case DECIMAL:
+ default:
+ throw new RuntimeException("Unexpected IN constant type " + inConstantType.name());
+ }
+ }
+ }
+ serializedInChildren[i] = Arrays.copyOfRange(buffer.getData(), 0, buffer.getLength());
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+
+ // Create a single child representing the scratch column where we will
+ // generate the serialized keys of the batch.
+ int scratchBytesCol = ocm.allocateOutputColumn("string");
+
+ Class<?> cl = (mode == Mode.FILTER ? FilterStructColumnInList.class : StructColumnInList.class);
+
+ expr = createVectorExpression(cl, null, Mode.PROJECTION, returnType);
+
+ ((IStringInExpr) expr).setInListValues(serializedInChildren);
+
+ ((IStructInExpr) expr).setScratchBytesColumn(scratchBytesCol);
+ ((IStructInExpr) expr).setStructColumnExprs(this, colExpr.getChildren(),
+ fieldVectorColumnTypes);
+
+ return expr;
+ }
+
/**
* Create a filter or boolean-valued expression for column IN ( <list-of-constants> )
*/
private VectorExpression getInExpression(List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType)
throws HiveException {
ExprNodeDesc colExpr = childExpr.get(0);
+ List<ExprNodeDesc> inChildren = childExpr.subList(1, childExpr.size());
String colType = colExpr.getTypeString();
+ colType = VectorizationContext.mapTypeNameSynonyms(colType);
+ TypeInfo colTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(colType);
+ Category category = colTypeInfo.getCategory();
+ if (category == Category.STRUCT){
+ return getStructInExpression(childExpr, colExpr, colTypeInfo, inChildren, mode, returnType);
+ } else if (category != Category.PRIMITIVE) {
+ return null;
+ }
// prepare arguments for createVectorExpression
- List<ExprNodeDesc> childrenForInList = evaluateCastOnConstants(childExpr.subList(1, childExpr.size()));
+ List<ExprNodeDesc> childrenForInList = evaluateCastOnConstants(inChildren);
/* This method assumes that the IN list has no NULL entries. That is enforced elsewhere,
* in the Vectorizer class. If NULL is passed in as a list entry, behavior is not defined.
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
index 2434e90..e34ec75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
@@ -20,16 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.expressions;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.udf.UDFLike;
-import org.apache.hadoop.io.Text;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Evaluate an IN filter on a batch for a vector of strings.
@@ -165,6 +156,10 @@ public class FilterStringColumnInList extends VectorExpression implements IStrin
return "boolean";
}
+ public void setInputColumn(int inputCol) {
+ this.inputCol = inputCol;
+ }
+
@Override
public int getOutputColumn() {
return -1;
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
new file mode 100644
index 0000000..00f22bb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/**
+ * Evaluate an IN filter on a batch for a vector of structs.
+ * This is optimized so that no objects have to be created in
+ * the inner loop, and there is a hash table implemented
+ * with Cuckoo hashing that has fast lookup to do the IN test.
+ */
+public class FilterStructColumnInList extends FilterStringColumnInList implements IStructInExpr {
+ private static final long serialVersionUID = 1L;
+ private VectorExpression[] structExpressions;
+ private ColumnVector.Type[] fieldVectorColumnTypes;
+ private int[] structColumnMap;
+ private int scratchBytesColumn;
+
+ private transient Output buffer;
+ private transient BinarySortableSerializeWrite binarySortableSerializeWrite;
+
+ /**
+ * After construction you must call setInListValues() to add the values to the IN set
+ * (on the IStringInExpr interface).
+ *
+ * And, call a and b on the IStructInExpr interface.
+ */
+ public FilterStructColumnInList() {
+ super(-1);
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch batch) {
+
+ final int logicalSize = batch.size;
+ if (logicalSize == 0) {
+ return;
+ }
+
+ if (buffer == null) {
+ buffer = new Output();
+ binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length);
+ }
+
+ for (VectorExpression ve : structExpressions) {
+ ve.evaluate(batch);
+ }
+
+ BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn];
+
+ try {
+ boolean selectedInUse = batch.selectedInUse;
+ int[] selected = batch.selected;
+ for (int logical = 0; logical < logicalSize; logical++) {
+ int batchIndex = (selectedInUse ? selected[logical] : logical);
+
+ binarySortableSerializeWrite.set(buffer);
+ for (int f = 0; f < structColumnMap.length; f++) {
+ int fieldColumn = structColumnMap[f];
+ ColumnVector colVec = batch.cols[fieldColumn];
+ int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex);
+ if (colVec.noNulls || !colVec.isNull[adjustedIndex]) {
+ switch (fieldVectorColumnTypes[f]) {
+ case BYTES:
+ {
+ BytesColumnVector bytesColVec = (BytesColumnVector) colVec;
+ byte[] bytes = bytesColVec.vector[adjustedIndex];
+ int start = bytesColVec.start[adjustedIndex];
+ int length = bytesColVec.length[adjustedIndex];
+ binarySortableSerializeWrite.writeString(bytes, start, length);
+ }
+ break;
+
+ case LONG:
+ binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]);
+ break;
+
+ case DOUBLE:
+ binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]);
+ break;
+
+ case DECIMAL:
+ binarySortableSerializeWrite.writeHiveDecimal(
+ ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal());
+ break;
+
+ default:
+ throw new RuntimeException("Unexpected vector column type " +
+ fieldVectorColumnTypes[f].name());
+ }
+ } else {
+ binarySortableSerializeWrite.writeNull();
+ }
+ }
+ scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength());
+ }
+
+ // Now, take the serialized keys we just wrote into our scratch column and look them
+ // up in the IN list.
+ super.evaluate(batch);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+
+ @Override
+ public String getOutputType() {
+ return "boolean";
+ }
+
+ @Override
+ public int getOutputColumn() {
+ return -1;
+ }
+
+ @Override
+ public Descriptor getDescriptor() {
+
+ // This VectorExpression (IN) is a special case, so don't return a descriptor.
+ return null;
+ }
+
+ @Override
+ public void setScratchBytesColumn(int scratchBytesColumn) {
+
+ // Tell our super class FilterStringColumnInList it will be evaluating our scratch
+ // BytesColumnVector.
+ super.setInputColumn(scratchBytesColumn);
+ this.scratchBytesColumn = scratchBytesColumn;
+ }
+
+ @Override
+ public void setStructColumnExprs(VectorizationContext vContext,
+ List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes)
+ throws HiveException {
+
+ structExpressions = vContext.getVectorExpressions(structColumnExprs);
+ structColumnMap = new int[structExpressions.length];
+ for (int i = 0; i < structColumnMap.length; i++) {
+ VectorExpression ve = structExpressions[i];
+ structColumnMap[i] = ve.getOutputColumn();
+ }
+ this.fieldVectorColumnTypes = fieldVectorColumnTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
new file mode 100644
index 0000000..3b25255
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+/**
+ * Interface used for both filter and non-filter versions of IN to simplify
+ * VectorizationContext code.
+ */
+public interface IStructInExpr {
+ void setScratchBytesColumn(int scratchBytesColumn);
+ void setStructColumnExprs(VectorizationContext vContext, List<ExprNodeDesc> structColumnExprs,
+ ColumnVector.Type[] fieldVectorColumnTypes) throws HiveException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
index 03833a2..b90e3c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
@@ -140,6 +140,10 @@ public class StringColumnInList extends VectorExpression implements IStringInExp
return "boolean";
}
+ public void setInputColumn(int inputCol) {
+ this.inputCol = inputCol;
+ }
+
@Override
public int getOutputColumn() {
return this.outputColumn;
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
new file mode 100644
index 0000000..724497a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/**
+ * Evaluate an IN boolean expression (not a filter) on a batch for a vector of structs.
+ * This is optimized so that no objects have to be created in
+ * the inner loop, and there is a hash table implemented
+ * with Cuckoo hashing that has fast lookup to do the IN test.
+ */
+public class StructColumnInList extends StringColumnInList implements IStructInExpr {
+ private static final long serialVersionUID = 1L;
+ private VectorExpression[] structExpressions;
+ private ColumnVector.Type[] fieldVectorColumnTypes;
+ private int[] structColumnMap;
+ private int scratchBytesColumn;
+
+ private transient Output buffer;
+ private transient BinarySortableSerializeWrite binarySortableSerializeWrite;
+
+ public StructColumnInList() {
+ super();
+ }
+
+ /**
+ * After construction you must call setInListValues() to add the values to the IN set.
+ */
+ public StructColumnInList(int outputColumn) {
+ super(-1, outputColumn);
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch batch) {
+
+ final int logicalSize = batch.size;
+ if (logicalSize == 0) {
+ return;
+ }
+
+ if (buffer == null) {
+ buffer = new Output();
+ binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length);
+ }
+
+ for (VectorExpression ve : structExpressions) {
+ ve.evaluate(batch);
+ }
+
+ BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn];
+
+ try {
+ boolean selectedInUse = batch.selectedInUse;
+ int[] selected = batch.selected;
+ for (int logical = 0; logical < logicalSize; logical++) {
+ int batchIndex = (selectedInUse ? selected[logical] : logical);
+
+ binarySortableSerializeWrite.set(buffer);
+ for (int f = 0; f < structColumnMap.length; f++) {
+ int fieldColumn = structColumnMap[f];
+ ColumnVector colVec = batch.cols[fieldColumn];
+ int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex);
+ if (colVec.noNulls || !colVec.isNull[adjustedIndex]) {
+ switch (fieldVectorColumnTypes[f]) {
+ case BYTES:
+ {
+ BytesColumnVector bytesColVec = (BytesColumnVector) colVec;
+ byte[] bytes = bytesColVec.vector[adjustedIndex];
+ int start = bytesColVec.start[adjustedIndex];
+ int length = bytesColVec.length[adjustedIndex];
+ binarySortableSerializeWrite.writeString(bytes, start, length);
+ }
+ break;
+
+ case LONG:
+ binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]);
+ break;
+
+ case DOUBLE:
+ binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]);
+ break;
+
+ case DECIMAL:
+ binarySortableSerializeWrite.writeHiveDecimal(
+ ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal());
+ break;
+
+ default:
+ throw new RuntimeException("Unexpected vector column type " +
+ fieldVectorColumnTypes[f].name());
+ }
+ } else {
+ binarySortableSerializeWrite.writeNull();
+ }
+ }
+ scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength());
+ }
+
+ // Now, take the serialized keys we just wrote into our scratch column and look them
+ // up in the IN list.
+ super.evaluate(batch);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ @Override
+ public String getOutputType() {
+ return "boolean";
+ }
+
+ @Override
+ public Descriptor getDescriptor() {
+
+ // This VectorExpression (IN) is a special case, so don't return a descriptor.
+ return null;
+ }
+
+
+ @Override
+ public void setScratchBytesColumn(int scratchBytesColumn) {
+
+ // Tell our super class FilterStringColumnInList it will be evaluating our scratch
+ // BytesColumnVector.
+ super.setInputColumn(scratchBytesColumn);
+ this.scratchBytesColumn = scratchBytesColumn;
+ }
+
+ @Override
+ public void setStructColumnExprs(VectorizationContext vContext,
+ List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes)
+ throws HiveException {
+
+ structExpressions = vContext.getVectorExpressions(structColumnExprs);
+ structColumnMap = new int[structExpressions.length];
+ for (int i = 0; i < structColumnMap.length; i++) {
+ VectorExpression ve = structExpressions[i];
+ structColumnMap[i] = ve.getOutputColumn();
+ }
+ this.fieldVectorColumnTypes = fieldVectorColumnTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 0d4c1d8..da1d9eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -53,10 +53,12 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiString
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
@@ -139,8 +141,11 @@ import org.apache.hadoop.hive.ql.udf.UDFYear;
import org.apache.hadoop.hive.ql.udf.generic.*;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -575,7 +580,12 @@ public class Vectorizer implements PhysicalPlanResolver {
if (nonVectorizableChildOfGroupBy(op)) {
return new Boolean(true);
}
- boolean ret = validateMapWorkOperator(op, mapWork, isTez);
+ boolean ret;
+ try {
+ ret = validateMapWorkOperator(op, mapWork, isTez);
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
if (!ret) {
LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
return new Boolean(false);
@@ -1260,6 +1270,7 @@ public class Vectorizer implements PhysicalPlanResolver {
LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
return false;
}
+ boolean isInExpression = false;
if (desc instanceof ExprNodeGenericFuncDesc) {
ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
boolean r = validateGenericUdf(d);
@@ -1267,12 +1278,62 @@ public class Vectorizer implements PhysicalPlanResolver {
LOG.info("Cannot vectorize UDF " + d);
return false;
}
+ GenericUDF genericUDF = d.getGenericUDF();
+ isInExpression = (genericUDF instanceof GenericUDFIn);
}
if (desc.getChildren() != null) {
- for (ExprNodeDesc d: desc.getChildren()) {
- // Don't restrict child expressions for projection. Always use looser FILTER mode.
- boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER);
- if (!r) {
+ if (isInExpression
+ && desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) {
+ // Don't restrict child expressions for projection.
+ // Always use loose FILTER mode.
+ if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) {
+ return false;
+ }
+ } else {
+ for (ExprNodeDesc d : desc.getChildren()) {
+ // Don't restrict child expressions for projection.
+ // Always use loose FILTER mode.
+ if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean validateStructInExpression(ExprNodeDesc desc,
+ VectorExpressionDescriptor.Mode mode) {
+ for (ExprNodeDesc d : desc.getChildren()) {
+ TypeInfo typeInfo = d.getTypeInfo();
+ if (typeInfo.getCategory() != Category.STRUCT) {
+ return false;
+ }
+ StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+
+ ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo
+ .getAllStructFieldTypeInfos();
+ ArrayList<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+ final int fieldCount = fieldTypeInfos.size();
+ for (int f = 0; f < fieldCount; f++) {
+ TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
+ Category category = fieldTypeInfo.getCategory();
+ if (category != Category.PRIMITIVE) {
+ LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+ + " of type " + fieldTypeInfo.getTypeName());
+ return false;
+ }
+ PrimitiveTypeInfo fieldPrimitiveTypeInfo = (PrimitiveTypeInfo) fieldTypeInfo;
+ InConstantType inConstantType = VectorizationContext
+ .getInConstantTypeFromPrimitiveCategory(fieldPrimitiveTypeInfo
+ .getPrimitiveCategory());
+
+ // For now, limit the data types we support for Vectorized Struct IN().
+ if (inConstantType != InConstantType.INT_FAMILY
+ && inConstantType != InConstantType.FLOAT_FAMILY
+ && inConstantType != InConstantType.STRING_FAMILY) {
+ LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+ + " of type " + fieldTypeInfo.getTypeName());
return false;
}
}