You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2015/09/23 08:25:03 UTC

[3/3] hive git commit: HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)

HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7cfe3743
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7cfe3743
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7cfe3743

Branch: refs/heads/master
Commit: 7cfe3743ff583386653bdd32c79f2c44ffe734ba
Parents: 2e8324e
Author: Gopal V <go...@apache.org>
Authored: Tue Sep 22 19:39:49 2015 -0700
Committer: Gopal V <go...@apache.org>
Committed: Tue Sep 22 23:24:14 2015 -0700

----------------------------------------------------------------------
 .../ql/exec/vector/VectorizationContext.java    |  203 +-
 .../expressions/FilterStringColumnInList.java   |   13 +-
 .../expressions/FilterStructColumnInList.java   |  178 ++
 .../exec/vector/expressions/IStructInExpr.java  |   36 +
 .../vector/expressions/StringColumnInList.java  |    4 +
 .../vector/expressions/StructColumnInList.java  |  174 ++
 .../hive/ql/optimizer/physical/Vectorizer.java  |   71 +-
 .../ql/optimizer/physical/Vectorizer.java.orig  | 1744 ++++++++++++++++++
 .../ql/optimizer/physical/Vectorizer.java.rej   |   86 +
 .../queries/clientpositive/vector_struct_in.q   |  247 +++
 .../clientpositive/vector_struct_in.q.out       |  825 +++++++++
 11 files changed, 3566 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 2483196..46c2a78 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -104,20 +104,30 @@ import org.apache.hadoop.hive.ql.udf.UDFToLong;
 import org.apache.hadoop.hive.ql.udf.UDFToShort;
 import org.apache.hadoop.hive.ql.udf.UDFToString;
 import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.DateUtils;
 
+
 /**
  * Context class for vectorization execution.
  * Main role is to map column names to column indices and serves as a
@@ -1273,17 +1283,208 @@ public class VectorizationContext {
     }
   }
 
+  public enum InConstantType {
+    INT_FAMILY,
+    TIMESTAMP,
+    DATE,
+    FLOAT_FAMILY,
+    STRING_FAMILY,
+    DECIMAL
+  }
+
+  public static InConstantType getInConstantTypeFromPrimitiveCategory(PrimitiveCategory primitiveCategory) {
+
+    switch (primitiveCategory) {
+    case BOOLEAN:
+    case BYTE:
+    case SHORT:
+    case INT:
+    case LONG:
+      return InConstantType.INT_FAMILY;
+  
+    case DATE:
+      return InConstantType.TIMESTAMP;
+
+    case TIMESTAMP:
+      return InConstantType.DATE;
+
+    case FLOAT:
+    case DOUBLE:
+      return InConstantType.FLOAT_FAMILY;
+  
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+    case BINARY:
+      return InConstantType.STRING_FAMILY;
+  
+    case DECIMAL:
+      return InConstantType.DECIMAL;
+  
+
+    case INTERVAL_YEAR_MONTH:
+    case INTERVAL_DAY_TIME:
+      // UNDONE: Fall through for these... they don't appear to be supported yet.
+    default:
+      throw new RuntimeException("Unexpected primitive type category " + primitiveCategory);
+    }
+  }
+
+  private VectorExpression getStructInExpression(List<ExprNodeDesc> childExpr, ExprNodeDesc colExpr,
+      TypeInfo colTypeInfo, List<ExprNodeDesc> inChildren, Mode mode, TypeInfo returnType)
+          throws HiveException {
+
+    VectorExpression expr = null;
+
+    StructTypeInfo structTypeInfo = (StructTypeInfo) colTypeInfo;
+
+    ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+    final int fieldCount = fieldTypeInfos.size();
+    ColumnVector.Type[] fieldVectorColumnTypes = new ColumnVector.Type[fieldCount];
+    InConstantType[] fieldInConstantTypes = new InConstantType[fieldCount];
+    for (int f = 0; f < fieldCount; f++) {
+      TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
+      // Only primitive fields supports for now.
+      if (fieldTypeInfo.getCategory() != Category.PRIMITIVE) {
+        return null;
+      }
+
+      // We are going to serialize using the 4 basic types.
+      ColumnVector.Type fieldVectorColumnType = getColumnVectorTypeFromTypeInfo(fieldTypeInfo);
+      fieldVectorColumnTypes[f] = fieldVectorColumnType;
+
+      // We currently evaluate the IN (..) constants in special ways.
+      PrimitiveCategory fieldPrimitiveCategory =
+          ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory();
+      InConstantType inConstantType = getInConstantTypeFromPrimitiveCategory(fieldPrimitiveCategory);
+      fieldInConstantTypes[f] = inConstantType;
+    }
+
+    Output buffer = new Output();
+    BinarySortableSerializeWrite binarySortableSerializeWrite =
+        new BinarySortableSerializeWrite(fieldCount);
+
+    final int inChildrenCount = inChildren.size();
+    byte[][] serializedInChildren = new byte[inChildrenCount][];
+    try {
+      for (int i = 0; i < inChildrenCount; i++) {
+        final ExprNodeDesc node = inChildren.get(i);
+        final Object[] constants;
+
+        if (node instanceof ExprNodeConstantDesc) {
+          ExprNodeConstantDesc constNode = (ExprNodeConstantDesc) node;
+          ConstantObjectInspector output = constNode.getWritableObjectInspector();
+          constants = ((List<?>) output.getWritableConstantValue()).toArray();
+        } else {
+          ExprNodeGenericFuncDesc exprNode = (ExprNodeGenericFuncDesc) node;
+          ExprNodeEvaluator<?> evaluator = ExprNodeEvaluatorFactory
+              .get(exprNode);
+          ObjectInspector output = evaluator.initialize(exprNode
+              .getWritableObjectInspector());
+          constants = (Object[]) evaluator.evaluate(null);
+        }
+
+        binarySortableSerializeWrite.set(buffer);
+        for (int f = 0; f < fieldCount; f++) {
+          Object constant = constants[f];
+          if (constant == null) {
+            binarySortableSerializeWrite.writeNull();
+          } else {
+            InConstantType inConstantType = fieldInConstantTypes[f];
+            switch (inConstantType) {
+            case STRING_FAMILY:
+              {
+                byte[] bytes;
+                if (constant instanceof Text) {
+                  Text text = (Text) constant;
+                  bytes = text.getBytes();
+                  binarySortableSerializeWrite.writeString(bytes, 0, text.getLength());
+                } else {
+                  throw new HiveException("Unexpected constant String type " +
+                      constant.getClass().getSimpleName());
+                }
+              }
+              break;
+            case INT_FAMILY:
+              {
+                long value;
+                if (constant instanceof IntWritable) {
+                  value = ((IntWritable) constant).get();
+                } else if (constant instanceof LongWritable) {
+                  value = ((LongWritable) constant).get();
+                } else {
+                  throw new HiveException("Unexpected constant Long type " +
+                      constant.getClass().getSimpleName());
+                }
+                binarySortableSerializeWrite.writeLong(value);
+              }
+              break;
+
+            case FLOAT_FAMILY:
+              {
+                double value;
+                if (constant instanceof DoubleWritable) {
+                  value = ((DoubleWritable) constant).get();
+                } else {
+                  throw new HiveException("Unexpected constant Double type " +
+                      constant.getClass().getSimpleName());
+                }
+                binarySortableSerializeWrite.writeDouble(value);
+              }
+              break;
+
+            // UNDONE...
+            case DATE:
+            case TIMESTAMP:
+            case DECIMAL:
+            default:
+              throw new RuntimeException("Unexpected IN constant type " + inConstantType.name());
+            }
+          }
+        }
+        serializedInChildren[i] = Arrays.copyOfRange(buffer.getData(), 0, buffer.getLength());
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+
+    // Create a single child representing the scratch column where we will
+    // generate the serialized keys of the batch.
+    int scratchBytesCol = ocm.allocateOutputColumn("string");
+
+    Class<?> cl = (mode == Mode.FILTER ? FilterStructColumnInList.class : StructColumnInList.class);
+
+    expr = createVectorExpression(cl, null, Mode.PROJECTION, returnType);
+
+    ((IStringInExpr) expr).setInListValues(serializedInChildren);
+
+    ((IStructInExpr) expr).setScratchBytesColumn(scratchBytesCol);
+    ((IStructInExpr) expr).setStructColumnExprs(this, colExpr.getChildren(),
+        fieldVectorColumnTypes);
+
+    return expr;
+  }
+
   /**
    * Create a filter or boolean-valued expression for column IN ( <list-of-constants> )
    */
   private VectorExpression getInExpression(List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType)
       throws HiveException {
     ExprNodeDesc colExpr = childExpr.get(0);
+    List<ExprNodeDesc> inChildren = childExpr.subList(1, childExpr.size());
 
     String colType = colExpr.getTypeString();
+    colType = VectorizationContext.mapTypeNameSynonyms(colType);
+    TypeInfo colTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(colType);
+    Category category = colTypeInfo.getCategory();
+    if (category == Category.STRUCT){
+      return getStructInExpression(childExpr, colExpr, colTypeInfo, inChildren, mode, returnType);
+    } else if (category != Category.PRIMITIVE) {
+      return null;
+    }
 
     // prepare arguments for createVectorExpression
-    List<ExprNodeDesc> childrenForInList =  evaluateCastOnConstants(childExpr.subList(1, childExpr.size()));
+    List<ExprNodeDesc> childrenForInList =  evaluateCastOnConstants(inChildren);
 
     /* This method assumes that the IN list has no NULL entries. That is enforced elsewhere,
      * in the Vectorizer class. If NULL is passed in as a list entry, behavior is not defined.

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
index 2434e90..e34ec75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
@@ -20,16 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.expressions;
 
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.udf.UDFLike;
-import org.apache.hadoop.io.Text;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Evaluate an IN filter on a batch for a vector of strings.
@@ -165,6 +156,10 @@ public class FilterStringColumnInList extends VectorExpression implements IStrin
     return "boolean";
   }
 
+  public void setInputColumn(int inputCol) {
+    this.inputCol = inputCol;
+  }
+
   @Override
   public int getOutputColumn() {
     return -1;

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
new file mode 100644
index 0000000..00f22bb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/**
+ * Evaluate an IN filter on a batch for a vector of structs.
+ * This is optimized so that no objects have to be created in
+ * the inner loop, and there is a hash table implemented
+ * with Cuckoo hashing that has fast lookup to do the IN test.
+ */
+public class FilterStructColumnInList extends FilterStringColumnInList implements IStructInExpr {
+  private static final long serialVersionUID = 1L;
+  private VectorExpression[] structExpressions;
+  private ColumnVector.Type[] fieldVectorColumnTypes;
+  private int[] structColumnMap;
+  private int scratchBytesColumn;
+
+  private transient Output buffer;
+  private transient BinarySortableSerializeWrite binarySortableSerializeWrite;
+
+  /**
+   * After construction you must call setInListValues() to add the values to the IN set
+   * (on the IStringInExpr interface).
+   *
+   * And, call a and b on the IStructInExpr interface.
+   */
+  public FilterStructColumnInList() {
+    super(-1);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+
+    final int logicalSize = batch.size;
+    if (logicalSize == 0) {
+      return;
+    }
+
+    if (buffer == null) {
+      buffer = new Output();
+      binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length);
+    }
+
+    for (VectorExpression ve : structExpressions) {
+      ve.evaluate(batch);
+    }
+
+    BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn];
+
+    try {
+    boolean selectedInUse = batch.selectedInUse;
+    int[] selected = batch.selected;
+    for (int logical = 0; logical < logicalSize; logical++) {
+      int batchIndex = (selectedInUse ? selected[logical] : logical);
+
+      binarySortableSerializeWrite.set(buffer);
+      for (int f = 0; f < structColumnMap.length; f++) {
+        int fieldColumn = structColumnMap[f];
+        ColumnVector colVec = batch.cols[fieldColumn];
+        int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex);
+        if (colVec.noNulls || !colVec.isNull[adjustedIndex]) {
+          switch (fieldVectorColumnTypes[f]) {
+          case BYTES:
+            {
+              BytesColumnVector bytesColVec = (BytesColumnVector) colVec;
+              byte[] bytes = bytesColVec.vector[adjustedIndex];
+              int start = bytesColVec.start[adjustedIndex];
+              int length = bytesColVec.length[adjustedIndex];
+              binarySortableSerializeWrite.writeString(bytes, start, length);
+            }
+            break;
+
+          case LONG:
+            binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DOUBLE:
+            binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DECIMAL:
+            binarySortableSerializeWrite.writeHiveDecimal(
+                ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal());
+            break;
+
+          default:
+            throw new RuntimeException("Unexpected vector column type " +
+                fieldVectorColumnTypes[f].name());
+          } 
+        } else {
+          binarySortableSerializeWrite.writeNull();
+        }
+      }
+      scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength());
+    }
+
+    // Now, take the serialized keys we just wrote into our scratch column and look them
+    // up in the IN list.
+    super.evaluate(batch);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+ 
+  }
+
+
+  @Override
+  public String getOutputType() {
+    return "boolean";
+  }
+
+  @Override
+  public int getOutputColumn() {
+    return -1;
+  }
+
+  @Override
+  public Descriptor getDescriptor() {
+
+    // This VectorExpression (IN) is a special case, so don't return a descriptor.
+    return null;
+  }
+
+  @Override
+  public void setScratchBytesColumn(int scratchBytesColumn) {
+
+    // Tell our super class FilterStringColumnInList it will be evaluating our scratch
+    // BytesColumnVector.
+    super.setInputColumn(scratchBytesColumn);
+    this.scratchBytesColumn = scratchBytesColumn;
+  }
+
+  @Override
+  public void setStructColumnExprs(VectorizationContext vContext,
+      List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes)
+          throws HiveException {
+
+    structExpressions = vContext.getVectorExpressions(structColumnExprs);
+    structColumnMap = new int[structExpressions.length];
+    for (int i = 0; i < structColumnMap.length; i++) {
+      VectorExpression ve = structExpressions[i];
+      structColumnMap[i] = ve.getOutputColumn();
+    }
+    this.fieldVectorColumnTypes = fieldVectorColumnTypes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
new file mode 100644
index 0000000..3b25255
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+/**
+ * Interface used for both filter and non-filter versions of IN to simplify
+ * VectorizationContext code.
+ */
+public interface IStructInExpr {
+  void setScratchBytesColumn(int scratchBytesColumn);
+  void setStructColumnExprs(VectorizationContext vContext, List<ExprNodeDesc> structColumnExprs,
+      ColumnVector.Type[] fieldVectorColumnTypes) throws HiveException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
index 03833a2..b90e3c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
@@ -140,6 +140,10 @@ public class StringColumnInList extends VectorExpression implements IStringInExp
     return "boolean";
   }
 
+  public void setInputColumn(int inputCol) {
+    this.inputCol = inputCol;
+  }
+
   @Override
   public int getOutputColumn() {
     return this.outputColumn;

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
new file mode 100644
index 0000000..724497a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/**
+ * Evaluate an IN boolean expression (not a filter) on a batch for a vector of structs.
+ * This is optimized so that no objects have to be created in
+ * the inner loop, and there is a hash table implemented
+ * with Cuckoo hashing that has fast lookup to do the IN test.
+ */
+public class StructColumnInList extends StringColumnInList implements IStructInExpr {
+  private static final long serialVersionUID = 1L;
+  private VectorExpression[] structExpressions;
+  private ColumnVector.Type[] fieldVectorColumnTypes;
+  private int[] structColumnMap;
+  private int scratchBytesColumn;
+
+  private transient Output buffer;
+  private transient BinarySortableSerializeWrite binarySortableSerializeWrite;
+
+  public StructColumnInList() {
+    super();
+  }
+
+  /**
+   * After construction you must call setInListValues() to add the values to the IN set.
+   */
+  public StructColumnInList(int outputColumn) {
+    super(-1, outputColumn);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+
+    final int logicalSize = batch.size;
+    if (logicalSize == 0) {
+      return;
+    }
+
+    if (buffer == null) {
+      buffer = new Output();
+      binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length);
+    }
+
+    for (VectorExpression ve : structExpressions) {
+      ve.evaluate(batch);
+    }
+
+    BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn];
+
+    try {
+    boolean selectedInUse = batch.selectedInUse;
+    int[] selected = batch.selected;
+    for (int logical = 0; logical < logicalSize; logical++) {
+      int batchIndex = (selectedInUse ? selected[logical] : logical);
+
+      binarySortableSerializeWrite.set(buffer);
+      for (int f = 0; f < structColumnMap.length; f++) {
+        int fieldColumn = structColumnMap[f];
+        ColumnVector colVec = batch.cols[fieldColumn];
+        int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex);
+        if (colVec.noNulls || !colVec.isNull[adjustedIndex]) {
+          switch (fieldVectorColumnTypes[f]) {
+          case BYTES:
+            {
+              BytesColumnVector bytesColVec = (BytesColumnVector) colVec;
+              byte[] bytes = bytesColVec.vector[adjustedIndex];
+              int start = bytesColVec.start[adjustedIndex];
+              int length = bytesColVec.length[adjustedIndex];
+              binarySortableSerializeWrite.writeString(bytes, start, length);
+            }
+            break;
+
+          case LONG:
+            binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DOUBLE:
+            binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DECIMAL:
+            binarySortableSerializeWrite.writeHiveDecimal(
+                ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal());
+            break;
+
+          default:
+            throw new RuntimeException("Unexpected vector column type " +
+                fieldVectorColumnTypes[f].name());
+          } 
+        } else {
+          binarySortableSerializeWrite.writeNull();
+        }
+      }
+      scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength());
+    }
+
+    // Now, take the serialized keys we just wrote into our scratch column and look them
+    // up in the IN list.
+    super.evaluate(batch);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+   }
+
+
+  @Override
+  public String getOutputType() {
+    return "boolean";
+  }
+
+  @Override
+  public Descriptor getDescriptor() {
+
+    // This VectorExpression (IN) is a special case, so don't return a descriptor.
+    return null;
+  }
+
+
+  @Override
+  public void setScratchBytesColumn(int scratchBytesColumn) {
+
+    // Tell our super class FilterStringColumnInList it will be evaluating our scratch
+    // BytesColumnVector.
+    super.setInputColumn(scratchBytesColumn);
+    this.scratchBytesColumn = scratchBytesColumn;
+  }
+
+  @Override
+  public void setStructColumnExprs(VectorizationContext vContext,
+      List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes)
+          throws HiveException {
+
+    structExpressions = vContext.getVectorExpressions(structColumnExprs);
+    structColumnMap = new int[structExpressions.length];
+    for (int i = 0; i < structColumnMap.length; i++) {
+      VectorExpression ve = structExpressions[i];
+      structColumnMap[i] = ve.getOutputColumn();
+    }
+    this.fieldVectorColumnTypes = fieldVectorColumnTypes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 0d4c1d8..da1d9eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -53,10 +53,12 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiString
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
@@ -139,8 +141,11 @@ import org.apache.hadoop.hive.ql.udf.UDFYear;
 import org.apache.hadoop.hive.ql.udf.generic.*;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
@@ -575,7 +580,12 @@ public class Vectorizer implements PhysicalPlanResolver {
         if (nonVectorizableChildOfGroupBy(op)) {
           return new Boolean(true);
         }
-        boolean ret = validateMapWorkOperator(op, mapWork, isTez);
+        boolean ret;
+        try {
+          ret = validateMapWorkOperator(op, mapWork, isTez);
+        } catch (Exception e) {
+          throw new SemanticException(e);
+        }
         if (!ret) {
           LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
           return new Boolean(false);
@@ -1260,6 +1270,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
       return false;
     }
+    boolean isInExpression = false;
     if (desc instanceof ExprNodeGenericFuncDesc) {
       ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
       boolean r = validateGenericUdf(d);
@@ -1267,12 +1278,62 @@ public class Vectorizer implements PhysicalPlanResolver {
         LOG.info("Cannot vectorize UDF " + d);
         return false;
       }
+      GenericUDF genericUDF = d.getGenericUDF();
+      isInExpression = (genericUDF instanceof GenericUDFIn);
     }
     if (desc.getChildren() != null) {
-      for (ExprNodeDesc d: desc.getChildren()) {
-        // Don't restrict child expressions for projection.  Always use looser FILTER mode.
-        boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER);
-        if (!r) {
+      if (isInExpression
+          && desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) {
+        // Don't restrict child expressions for projection. 
+        // Always use loose FILTER mode.
+        if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) {
+          return false;
+        }
+      } else {
+        for (ExprNodeDesc d : desc.getChildren()) {
+          // Don't restrict child expressions for projection. 
+          // Always use loose FILTER mode.
+          if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  private boolean validateStructInExpression(ExprNodeDesc desc,
+      VectorExpressionDescriptor.Mode mode) {
+    for (ExprNodeDesc d : desc.getChildren()) {
+      TypeInfo typeInfo = d.getTypeInfo();
+      if (typeInfo.getCategory() != Category.STRUCT) {
+        return false;
+      }
+      StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+
+      ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo
+          .getAllStructFieldTypeInfos();
+      ArrayList<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+      final int fieldCount = fieldTypeInfos.size();
+      for (int f = 0; f < fieldCount; f++) {
+        TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
+        Category category = fieldTypeInfo.getCategory();
+        if (category != Category.PRIMITIVE) {
+          LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+              + " of type " + fieldTypeInfo.getTypeName());
+          return false;
+        }
+        PrimitiveTypeInfo fieldPrimitiveTypeInfo = (PrimitiveTypeInfo) fieldTypeInfo;
+        InConstantType inConstantType = VectorizationContext
+            .getInConstantTypeFromPrimitiveCategory(fieldPrimitiveTypeInfo
+                .getPrimitiveCategory());
+
+        // For now, limit the data types we support for Vectorized Struct IN().
+        if (inConstantType != InConstantType.INT_FAMILY
+            && inConstantType != InConstantType.FLOAT_FAMILY
+            && inConstantType != InConstantType.STRING_FAMILY) {
+          LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+              + " of type " + fieldTypeInfo.getTypeName());
           return false;
         }
       }