You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/30 07:33:34 UTC

[3/3] hive git commit: HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)

HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/62ebd1ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/62ebd1ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/62ebd1ab

Branch: refs/heads/master
Commit: 62ebd1abb87d951024a371ba37a451f536a594e6
Parents: 79eb224
Author: Jason Dere <jd...@hortonworks.com>
Authored: Sun Jan 29 23:33:02 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Sun Jan 29 23:33:02 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ant/GenVectorCode.java   |  83 ++
 .../test/resources/testconfiguration.properties |   1 +
 .../FilterColumnBetweenDynamicValue.txt         | 101 ++
 .../FilterDecimalColumnBetween.txt              |  16 +
 .../FilterStringColumnBetween.txt               |   8 +-
 .../FilterTimestampColumnBetween.txt            |  16 +
 .../FilterTruncStringColumnBetween.txt          |   8 +-
 .../ql/exec/tez/DynamicValueRegistryTez.java    |  12 +-
 .../exec/vector/VectorExpressionDescriptor.java |   3 +-
 .../ql/exec/vector/VectorFilterOperator.java    |   2 +
 .../ql/exec/vector/VectorSelectOperator.java    |   1 +
 .../ql/exec/vector/VectorizationContext.java    |  77 +-
 .../DynamicValueVectorExpression.java           | 314 +++++++
 .../vector/expressions/VectorExpression.java    |  13 +-
 .../VectorInBloomFilterColDynamicValue.java     | 285 ++++++
 .../aggregates/VectorUDAFBloomFilter.java       | 474 ++++++++++
 .../aggregates/VectorUDAFBloomFilterMerge.java  | 365 ++++++++
 .../hive/ql/optimizer/physical/Vectorizer.java  |   2 +
 .../ql/udf/generic/GenericUDAFBloomFilter.java  |  10 +-
 .../ql/udf/generic/GenericUDFInBloomFilter.java |  12 +-
 .../ql/optimizer/physical/TestVectorizer.java   |  31 +
 .../test/queries/clientpositive/explainuser_3.q |   1 +
 .../vectorized_dynamic_semijoin_reduction.q     |  43 +
 .../results/clientpositive/llap/mergejoin.q.out |  66 +-
 .../results/clientpositive/llap/orc_llap.q.out  |  12 +-
 .../llap/vector_binary_join_groupby.q.out       |   4 +-
 .../vectorized_dynamic_partition_pruning.q.out  |   4 +-
 .../vectorized_dynamic_semijoin_reduction.q.out | 932 +++++++++++++++++++
 .../clientpositive/tez/explainuser_3.q.out      |  17 +-
 .../vector_binary_join_groupby.q.out            |   1 +
 .../apache/hive/common/util/BloomFilter.java    |  36 +
 .../hive/common/util/TestBloomFilter.java       | 125 +++
 32 files changed, 3000 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
----------------------------------------------------------------------
diff --git a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
index e9fe8fa..133ef0a 100644
--- a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
+++ b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
@@ -789,6 +789,15 @@ public class GenVectorCode extends Task {
       {"FilterTimestampColumnBetween", ""},
       {"FilterTimestampColumnBetween", "!"},
 
+      // This is for runtime min/max pushdown - don't need to do NOT BETWEEN
+      {"FilterColumnBetweenDynamicValue", "long", ""},
+      {"FilterColumnBetweenDynamicValue", "double", ""},
+      {"FilterColumnBetweenDynamicValue", "decimal", ""},
+      {"FilterColumnBetweenDynamicValue", "string", ""},
+      {"FilterColumnBetweenDynamicValue", "char", ""},
+      {"FilterColumnBetweenDynamicValue", "varchar", ""},
+      {"FilterColumnBetweenDynamicValue", "timestamp", ""},
+
       {"ColumnCompareColumn", "Equal", "long", "double", "=="},
       {"ColumnCompareColumn", "Equal", "double", "double", "=="},
       {"ColumnCompareColumn", "NotEqual", "long", "double", "!="},
@@ -1164,6 +1173,8 @@ public class GenVectorCode extends Task {
 
       } else if (tdesc[0].equals("FilterColumnBetween")) {
         generateFilterColumnBetween(tdesc);
+      } else if (tdesc[0].equals("FilterColumnBetweenDynamicValue")) {
+        generateFilterColumnBetweenDynamicValue(tdesc);
       } else if (tdesc[0].equals("ScalarArithmeticColumn") || tdesc[0].equals("ScalarDivideColumn")) {
         generateScalarArithmeticColumn(tdesc);
       } else if (tdesc[0].equals("FilterColumnCompareColumn")) {
@@ -1379,6 +1390,72 @@ public class GenVectorCode extends Task {
         className, templateString);
   }
 
+  private void generateFilterColumnBetweenDynamicValue(String[] tdesc) throws Exception {
+    String operandType = tdesc[1];
+    String optionalNot = tdesc[2];
+
+    String className = "Filter" + getCamelCaseType(operandType) + "Column" +
+      (optionalNot.equals("!") ? "Not" : "") + "BetweenDynamicValue";
+
+    String typeName = getCamelCaseType(operandType);
+    String defaultValue;
+    String vectorType;
+    String getPrimitiveMethod;
+    String getValueMethod;
+
+    if (operandType.equals("long")) {
+      defaultValue = "0";
+      vectorType = "long";
+      getPrimitiveMethod = "getLong";
+      getValueMethod = "";
+    } else if (operandType.equals("double")) {
+      defaultValue = "0";
+      vectorType = "double";
+      getPrimitiveMethod = "getDouble";
+      getValueMethod = "";
+    } else if (operandType.equals("decimal")) {
+      defaultValue = "null";
+      vectorType = "HiveDecimal";
+      getPrimitiveMethod = "getHiveDecimal";
+      getValueMethod = "";
+    } else if (operandType.equals("string")) {
+      defaultValue = "null";
+      vectorType = "byte[]";
+      getPrimitiveMethod = "getString";
+      getValueMethod = ".getBytes()";
+    } else if (operandType.equals("char")) {
+      defaultValue = "null";
+      vectorType = "byte[]";
+      getPrimitiveMethod = "getHiveChar";
+      getValueMethod = ".getStrippedValue().getBytes()";  // Does vectorization use stripped char values?
+    } else if (operandType.equals("varchar")) {
+      defaultValue = "null";
+      vectorType = "byte[]";
+      getPrimitiveMethod = "getHiveVarchar";
+      getValueMethod = ".getValue().getBytes()";
+    } else if (operandType.equals("timestamp")) {
+      defaultValue = "null";
+      vectorType = "Timestamp";
+      getPrimitiveMethod = "getTimestamp";
+      getValueMethod = "";
+    } else {
+      throw new IllegalArgumentException("Type " + operandType + " not supported");
+    }
+
+    // Read the template into a string, expand it, and write it.
+    File templateFile = new File(joinPath(this.expressionTemplateDirectory, tdesc[0] + ".txt"));
+    String templateString = readFile(templateFile);
+    templateString = templateString.replaceAll("<ClassName>", className);
+    templateString = templateString.replaceAll("<TypeName>", typeName);
+    templateString = templateString.replaceAll("<DefaultValue>", defaultValue);
+    templateString = templateString.replaceAll("<VectorType>", vectorType);
+    templateString = templateString.replaceAll("<GetPrimitiveMethod>", getPrimitiveMethod);
+    templateString = templateString.replaceAll("<GetValueMethod>", getValueMethod);
+
+    writeFile(templateFile.lastModified(), expressionOutputDirectory, expressionClassesDirectory,
+        className, templateString);
+  }
+
   private void generateColumnCompareColumn(String[] tdesc) throws Exception {
     String operatorName = tdesc[1];
     String operandType1 = tdesc[2];
@@ -3084,6 +3161,12 @@ public class GenVectorCode extends Task {
       return "Timestamp";
     } else if (type.equals("date")) {
       return "Date";
+    } else if (type.equals("string")) {
+      return "String";
+    } else if (type.equals("char")) {
+      return "Char";
+    } else if (type.equals("varchar")) {
+      return "VarChar";
     } else {
       return type;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e966959..8b3f589 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -611,6 +611,7 @@ minillaplocal.query.files=acid_globallimit.q,\
   vector_udf1.q,\
   vectorization_short_regress.q,\
   vectorized_dynamic_partition_pruning.q,\
+  vectorized_dynamic_semijoin_reduction.q,\
   vectorized_ptf.q,\
   windowing.q,\
   windowing_gby.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
new file mode 100644
index 0000000..97ab7aa
--- /dev/null
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.gen;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.Filter<TypeName>ColumnBetween;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Timestamp;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+
+public class <ClassName> extends Filter<TypeName>ColumnBetween {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(<ClassName>.class);
+
+  protected DynamicValue leftDynamicValue;
+  protected DynamicValue rightDynamicValue;
+  protected transient boolean initialized = false;
+  protected transient boolean isLeftOrRightNull = false;
+
+  public <ClassName>(int colNum, DynamicValue leftValue, DynamicValue rightValue) {
+    super(colNum, <DefaultValue>, <DefaultValue>);
+    this.leftDynamicValue = leftValue;
+    this.rightDynamicValue = rightValue;
+  }
+
+  public <ClassName>() {
+  }
+
+  public DynamicValue getLeftDynamicValue() {
+    return leftDynamicValue;
+  }
+
+  public void setLeftDynamicValue(DynamicValue leftValue) {
+    this.leftDynamicValue = leftValue;
+  }
+
+  public DynamicValue getRightDynamicValue() {
+    return rightDynamicValue;
+  }
+
+  public void getRightDynamicValue(DynamicValue rightValue) {
+    this.rightDynamicValue = rightValue;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    leftDynamicValue.setConf(conf);
+    rightDynamicValue.setConf(conf);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+    if (!initialized) {
+      Object lVal = leftDynamicValue.getValue();
+      Object rVal = rightDynamicValue.getValue();
+      if (lVal == null || rVal == null) {
+        isLeftOrRightNull = true;
+      } else {
+        <VectorType> min = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+            lVal, leftDynamicValue.getObjectInspector())<GetValueMethod>;
+        setLeftValue(min);
+
+        <VectorType> max = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+            rVal, rightDynamicValue.getObjectInspector())<GetValueMethod>;
+        setRightValue(max);
+      }
+      initialized = true;
+    }
+
+    // Special case for dynamic values - min/max can be null
+    if (isLeftOrRightNull) {
+      // Entire batch is filtered out
+      batch.size = 0;
+    }
+
+    super.evaluate(batch);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
index d68edfa..62d2254 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
@@ -154,6 +154,22 @@ public class <ClassName> extends VectorExpression {
     return "boolean";
   }
 
+  public HiveDecimal getLeftValue() {
+    return leftValue;
+  }
+
+  public void setLeftValue(HiveDecimal value) {
+    this.leftValue = value;
+  }
+
+  public HiveDecimal getRightValue() {
+    return rightValue;
+  }
+
+  public void setRightValue(HiveDecimal value) {
+    this.rightValue = value;
+  }
+
   @Override
   public VectorExpressionDescriptor.Descriptor getDescriptor() {
     return (new VectorExpressionDescriptor.Builder())

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
index e8049da..16d4aaf 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
@@ -161,19 +161,19 @@ public class <ClassName> extends VectorExpression {
     this.colNum = colNum;
   }
 
-  public byte[] getLeft() {
+  public byte[] getLeftValue() {
     return left;
   }
 
-  public void setLeft(byte[] value) {
+  public void setLeftValue(byte[] value) {
     this.left = value;
   }
   
-  public byte[] getRight() {
+  public byte[] getRightValue() {
     return right;
   }
 
-  public void setRight(byte[] value) {
+  public void setRightValue(byte[] value) {
     this.right = value;
   }
   

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
index 4298d79..806148f 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
@@ -153,6 +153,22 @@ public class <ClassName> extends VectorExpression {
     return "boolean";
   }
 
+  public Timestamp getLeftValue() {
+    return leftValue;
+  }
+
+  public void setLeftValue(Timestamp value) {
+    this.leftValue = value;
+  }
+
+  public Timestamp getRightValue() {
+    return rightValue;
+  }
+
+  public void setRightValue(Timestamp value) {
+    this.rightValue = value;
+  }
+
   @Override
   public VectorExpressionDescriptor.Descriptor getDescriptor() {
     return (new VectorExpressionDescriptor.Builder())

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
index 94a174d..d350dcb 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
@@ -163,19 +163,19 @@ public class <ClassName> extends VectorExpression {
     this.colNum = colNum;
   }
 
-  public byte[] getLeft() {
+  public byte[] getLeftValue() {
     return left;
   }
 
-  public void setLeft(byte[] value) {
+  public void setLeftValue(byte[] value) {
     this.left = value;
   }
   
-  public byte[] getRight() {
+  public byte[] getRightValue() {
     return right;
   }
 
-  public void setRight(byte[] value) {
+  public void setRightValue(byte[] value) {
     this.right = value;
   }
   

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
index 7bbedf6..b7687c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
@@ -122,9 +122,15 @@ public class DynamicValueRegistryTez implements DynamicValueRegistry {
           setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val);
         }
       }
-      // For now, expecting a single row (min/max, aggregated bloom filter)
-      if (rowCount != 1) {
-        throw new IllegalStateException("Expected 1 row from " + inputSourceName + ", got " + rowCount);
+      // For now, expecting a single row (min/max, aggregated bloom filter), or no rows
+      if (rowCount == 0) {
+        LOG.debug("No input rows from " + inputSourceName + ", filling dynamic values with nulls");
+        for (int colIdx = 0; colIdx < colExprEvaluators.size(); ++colIdx) {
+          ExprNodeEvaluator eval = colExprEvaluators.get(colIdx);
+          setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), null);
+        }
+      } else if (rowCount > 1) {
+        throw new IllegalStateException("Expected 0 or 1 rows from " + inputSourceName + ", got " + rowCount);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
index 217af3f..f4499d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
@@ -183,7 +183,8 @@ public class VectorExpressionDescriptor {
   public enum InputExpressionType {
     NONE(0),
     COLUMN(1),
-    SCALAR(2);
+    SCALAR(2),
+    DYNAMICVALUE(3);
 
     private final int value;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
index 261246b..2598445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
@@ -72,6 +72,8 @@ public class VectorFilterOperator extends FilterOperator {
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
+
+      conditionEvaluator.init(hconf);
     } catch (Throwable e) {
       throw new HiveException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index f7fec8f..bb382b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -113,6 +113,7 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
 
     projectedColumns = new int [vExpressions.length];
     for (int i = 0; i < projectedColumns.length; i++) {
+      vExpressions[i].init(hconf);
       projectedColumns[i] = vExpressions[i].getOutputColumn();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index c887757..484f615 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgTimestamp;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilterMerge;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
@@ -97,6 +99,7 @@ import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.udf.SettableUDF;
@@ -585,6 +588,8 @@ public class VectorizationContext {
     } else if (exprDesc instanceof ExprNodeConstantDesc) {
       ve = getConstantVectorExpression(((ExprNodeConstantDesc) exprDesc).getValue(), exprDesc.getTypeInfo(),
           mode);
+    } else if (exprDesc instanceof ExprNodeDynamicValueDesc) {
+      ve = getDynamicValueVectorExpression((ExprNodeDynamicValueDesc) exprDesc, mode);
     }
     if (ve == null) {
       throw new HiveException(
@@ -1094,6 +1099,21 @@ public class VectorizationContext {
     }
   }
 
+  private VectorExpression getDynamicValueVectorExpression(ExprNodeDynamicValueDesc dynamicValueExpr,
+      VectorExpressionDescriptor.Mode mode) throws HiveException {
+    String typeName =  dynamicValueExpr.getTypeInfo().getTypeName();
+    VectorExpressionDescriptor.ArgumentType vectorArgType = VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(typeName);
+    if (vectorArgType == VectorExpressionDescriptor.ArgumentType.NONE) {
+      throw new HiveException("No vector argument type for type name " + typeName);
+    }
+    int outCol = -1;
+    if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
+      outCol = ocm.allocateOutputColumn(dynamicValueExpr.getTypeInfo());
+    }
+
+    return new DynamicValueVectorExpression(outCol, dynamicValueExpr.getTypeInfo(), dynamicValueExpr.getDynamicValue());
+  }
+
   /**
    * Used as a fast path for operations that don't modify their input, like unary +
    * and casting boolean to long. IdentityExpression and its children are always
@@ -1181,6 +1201,8 @@ public class VectorizationContext {
         builder.setInputExpressionType(i, InputExpressionType.COLUMN);
       } else if (child instanceof ExprNodeConstantDesc) {
         builder.setInputExpressionType(i, InputExpressionType.SCALAR);
+      } else if (child instanceof ExprNodeDynamicValueDesc) {
+        builder.setInputExpressionType(i, InputExpressionType.DYNAMICVALUE);
       } else {
         throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
       }
@@ -1225,6 +1247,8 @@ public class VectorizationContext {
         } else if (child instanceof ExprNodeConstantDesc) {
           Object scalarValue = getVectorTypeScalarValue((ExprNodeConstantDesc) child);
           arguments[i] = (null == scalarValue) ? getConstantVectorExpression(null, child.getTypeInfo(), childrenMode) : scalarValue;
+        } else if (child instanceof ExprNodeDynamicValueDesc) {
+          arguments[i] = ((ExprNodeDynamicValueDesc) child).getDynamicValue();
         } else {
           throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
         }
@@ -2092,8 +2116,13 @@ public class VectorizationContext {
       return null;
     }
 
+    boolean hasDynamicValues = false;
+
     // We don't currently support the BETWEEN ends being columns.  They must be scalars.
-    if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
+    if ((childExpr.get(2) instanceof ExprNodeDynamicValueDesc) &&
+        (childExpr.get(3) instanceof ExprNodeDynamicValueDesc)) {
+      hasDynamicValues = true;
+    } else if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
         !(childExpr.get(3) instanceof ExprNodeConstantDesc)) {
       return null;
     }
@@ -2138,35 +2167,51 @@ public class VectorizationContext {
     // determine class
     Class<?> cl = null;
     if (isIntFamily(colType) && !notKeywordPresent) {
-      cl = FilterLongColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterLongColumnBetweenDynamicValue.class :
+          FilterLongColumnBetween.class);
     } else if (isIntFamily(colType) && notKeywordPresent) {
       cl = FilterLongColumnNotBetween.class;
     } else if (isFloatFamily(colType) && !notKeywordPresent) {
-      cl = FilterDoubleColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterDoubleColumnBetweenDynamicValue.class :
+          FilterDoubleColumnBetween.class);
     } else if (isFloatFamily(colType) && notKeywordPresent) {
       cl = FilterDoubleColumnNotBetween.class;
     } else if (colType.equals("string") && !notKeywordPresent) {
-      cl = FilterStringColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterStringColumnBetweenDynamicValue.class :
+          FilterStringColumnBetween.class);
     } else if (colType.equals("string") && notKeywordPresent) {
       cl = FilterStringColumnNotBetween.class;
     } else if (varcharTypePattern.matcher(colType).matches() && !notKeywordPresent) {
-      cl = FilterVarCharColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterVarCharColumnBetweenDynamicValue.class :
+          FilterVarCharColumnBetween.class);
     } else if (varcharTypePattern.matcher(colType).matches() && notKeywordPresent) {
       cl = FilterVarCharColumnNotBetween.class;
     } else if (charTypePattern.matcher(colType).matches() && !notKeywordPresent) {
-      cl = FilterCharColumnBetween.class;
+      cl =  (hasDynamicValues ?
+          FilterCharColumnBetweenDynamicValue.class :
+          FilterCharColumnBetween.class);
     } else if (charTypePattern.matcher(colType).matches() && notKeywordPresent) {
       cl = FilterCharColumnNotBetween.class;
     } else if (colType.equals("timestamp") && !notKeywordPresent) {
-      cl = FilterTimestampColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterTimestampColumnBetweenDynamicValue.class :
+          FilterTimestampColumnBetween.class);
     } else if (colType.equals("timestamp") && notKeywordPresent) {
       cl = FilterTimestampColumnNotBetween.class;
     } else if (isDecimalFamily(colType) && !notKeywordPresent) {
-      cl = FilterDecimalColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterDecimalColumnBetweenDynamicValue.class :
+          FilterDecimalColumnBetween.class);
     } else if (isDecimalFamily(colType) && notKeywordPresent) {
       cl = FilterDecimalColumnNotBetween.class;
     } else if (isDateFamily(colType) && !notKeywordPresent) {
-      cl = FilterLongColumnBetween.class;
+      cl =  (hasDynamicValues ?
+          FilterLongColumnBetweenDynamicValue.class :
+          FilterLongColumnBetween.class);
     } else if (isDateFamily(colType) && notKeywordPresent) {
       cl = FilterLongColumnNotBetween.class;
     }
@@ -2224,6 +2269,12 @@ public class VectorizationContext {
       } else if (child instanceof ExprNodeConstantDesc) {
         // this is a constant (or null)
         argDescs[i].setConstant((ExprNodeConstantDesc) child);
+      } else if (child instanceof ExprNodeDynamicValueDesc) {
+        VectorExpression e = getVectorExpression(child, VectorExpressionDescriptor.Mode.PROJECTION);
+        vectorExprs.add(e);
+        variableArgPositions.add(i);
+        exprResultColumnNums.add(e.getOutputColumn());
+        argDescs[i].setVariable(e.getOutputColumn());
       } else {
         throw new HiveException("Unable to vectorize custom UDF. Encountered unsupported expr desc : "
             + child);
@@ -2651,6 +2702,14 @@ public class VectorizationContext {
     add(new AggregateDefinition("stddev_samp", ArgumentType.FLOAT_FAMILY,                    Mode.PARTIAL1,     VectorUDAFStdSampDouble.class));
     add(new AggregateDefinition("stddev_samp", ArgumentType.DECIMAL,                         Mode.PARTIAL1,     VectorUDAFStdSampDecimal.class));
     add(new AggregateDefinition("stddev_samp", ArgumentType.TIMESTAMP,                       Mode.PARTIAL1,     VectorUDAFStdSampTimestamp.class));
+
+    // UDAFBloomFilter. Original data is one type, partial/final is another,
+    // so this requires 2 aggregation classes (partial1/complete), (partial2/final)
+    add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY,                     Mode.PARTIAL1,     VectorUDAFBloomFilter.class));
+    add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY,                     Mode.COMPLETE,     VectorUDAFBloomFilter.class));
+    add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY,                         Mode.PARTIAL2,     VectorUDAFBloomFilterMerge.class));
+    add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY,                         Mode.FINAL,        VectorUDAFBloomFilterMerge.class));
+
   }};
 
   public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
new file mode 100644
index 0000000..1a34118
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.sql.Timestamp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.ql.exec.vector.*;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Constant is represented as a vector with repeating values.
+ */
+public class DynamicValueVectorExpression extends VectorExpression {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicValueVectorExpression.class);
+
+  private static final long serialVersionUID = 1L;
+
+  DynamicValue dynamicValue;
+  TypeInfo typeInfo;
+  transient private boolean initialized = false;
+
+  private int outputColumn;
+  protected long longValue = 0;
+  private double doubleValue = 0;
+  private byte[] bytesValue = null;
+  private HiveDecimal decimalValue = null;
+  private Timestamp timestampValue = null;
+  private HiveIntervalDayTime intervalDayTimeValue = null;
+  private boolean isNullValue = false;
+
+  private ColumnVector.Type type;
+  private int bytesValueLength = 0;
+
+  public DynamicValueVectorExpression() {
+    super();
+  }
+
+  public DynamicValueVectorExpression(int outputColumn, TypeInfo typeInfo, DynamicValue dynamicValue) {
+    this();
+    this.outputColumn = outputColumn;
+    this.type = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
+    this.dynamicValue = dynamicValue;
+    this.typeInfo = typeInfo;
+  }
+
+  private void evaluateLong(VectorizedRowBatch vrg) {
+    LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn];
+    cv.isRepeating = true;
+    cv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      cv.vector[0] = longValue;
+      cv.isNull[0] = false;
+    } else {
+      cv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateDouble(VectorizedRowBatch vrg) {
+    DoubleColumnVector cv = (DoubleColumnVector) vrg.cols[outputColumn];
+    cv.isRepeating = true;
+    cv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      cv.vector[0] = doubleValue;
+      cv.isNull[0] = false;
+    } else {
+      cv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateBytes(VectorizedRowBatch vrg) {
+    BytesColumnVector cv = (BytesColumnVector) vrg.cols[outputColumn];
+    cv.isRepeating = true;
+    cv.noNulls = !isNullValue;
+    cv.initBuffer();
+    if (!isNullValue) {
+      cv.setVal(0, bytesValue, 0, bytesValueLength);
+      cv.isNull[0] = false;
+    } else {
+      cv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateDecimal(VectorizedRowBatch vrg) {
+    DecimalColumnVector dcv = (DecimalColumnVector) vrg.cols[outputColumn];
+    dcv.isRepeating = true;
+    dcv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      dcv.vector[0].set(decimalValue);
+      dcv.isNull[0] = false;
+    } else {
+      dcv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateTimestamp(VectorizedRowBatch vrg) {
+    TimestampColumnVector dcv = (TimestampColumnVector) vrg.cols[outputColumn];
+    dcv.isRepeating = true;
+    dcv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      dcv.set(0, timestampValue);
+      dcv.isNull[0] = false;
+    } else {
+      dcv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateIntervalDayTime(VectorizedRowBatch vrg) {
+    IntervalDayTimeColumnVector dcv = (IntervalDayTimeColumnVector) vrg.cols[outputColumn];
+    dcv.isRepeating = true;
+    dcv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      dcv.set(0, intervalDayTimeValue);
+      dcv.isNull[0] = false;
+    } else {
+      dcv.isNull[0] = true;
+    }
+  }
+
+  private void initValue() {
+    Object val = dynamicValue.getValue();
+
+    if (val == null) {
+      isNullValue = true;
+    } else {
+      PrimitiveObjectInspector poi = dynamicValue.getObjectInspector();
+      byte[] bytesVal;
+      switch (poi.getPrimitiveCategory()) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        longValue = PrimitiveObjectInspectorUtils.getLong(val, poi);
+        break;
+      case FLOAT:
+      case DOUBLE:
+        doubleValue = PrimitiveObjectInspectorUtils.getDouble(val, poi);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        bytesVal = PrimitiveObjectInspectorUtils.getString(val, poi).getBytes();
+        setBytesValue(bytesVal);
+        break;
+      case BINARY:
+        bytesVal = PrimitiveObjectInspectorUtils.getBinary(val, poi).copyBytes();
+        setBytesValue(bytesVal);
+        break;
+      case DECIMAL:
+        decimalValue = PrimitiveObjectInspectorUtils.getHiveDecimal(val, poi);
+        break;
+      case DATE:
+        longValue = DateWritable.dateToDays(PrimitiveObjectInspectorUtils.getDate(val, poi));
+      case TIMESTAMP:
+        timestampValue = PrimitiveObjectInspectorUtils.getTimestamp(val, poi);
+        break;
+      case INTERVAL_YEAR_MONTH:
+        longValue = PrimitiveObjectInspectorUtils.getHiveIntervalYearMonth(val, poi).getTotalMonths();
+        break;
+      case INTERVAL_DAY_TIME:
+        intervalDayTimeValue = PrimitiveObjectInspectorUtils.getHiveIntervalDayTime(val, poi);
+        break;
+      default:
+        throw new IllegalStateException("Unsupported type " + poi.getPrimitiveCategory());
+      }
+    }
+
+    initialized = true;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    dynamicValue.setConf(conf);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch vrg) {
+    if (!initialized) {
+      initValue();
+    }
+
+    switch (type) {
+    case LONG:
+      evaluateLong(vrg);
+      break;
+    case DOUBLE:
+      evaluateDouble(vrg);
+      break;
+    case BYTES:
+      evaluateBytes(vrg);
+      break;
+    case DECIMAL:
+      evaluateDecimal(vrg);
+      break;
+    case TIMESTAMP:
+      evaluateTimestamp(vrg);
+      break;
+    case INTERVAL_DAY_TIME:
+      evaluateIntervalDayTime(vrg);
+      break;
+    default:
+      throw new IllegalStateException("Unsupported type " + type);
+    }
+  }
+
+  @Override
+  public int getOutputColumn() {
+    return outputColumn;
+  }
+
+  public long getLongValue() {
+    return longValue;
+  }
+
+  public void setLongValue(long longValue) {
+    this.longValue = longValue;
+  }
+
+  public double getDoubleValue() {
+    return doubleValue;
+  }
+
+  public void setDoubleValue(double doubleValue) {
+    this.doubleValue = doubleValue;
+  }
+
+  public byte[] getBytesValue() {
+    return bytesValue;
+  }
+
+  public void setBytesValue(byte[] bytesValue) {
+    this.bytesValue = bytesValue.clone();
+    this.bytesValueLength = bytesValue.length;
+  }
+
+  public void setDecimalValue(HiveDecimal decimalValue) {
+    this.decimalValue = decimalValue;
+  }
+
+  public HiveDecimal getDecimalValue() {
+    return decimalValue;
+  }
+
+  public void setTimestampValue(Timestamp timestampValue) {
+    this.timestampValue = timestampValue;
+  }
+
+  public Timestamp getTimestampValue() {
+    return timestampValue;
+  }
+
+  public void setIntervalDayTimeValue(HiveIntervalDayTime intervalDayTimeValue) {
+    this.intervalDayTimeValue = intervalDayTimeValue;
+  }
+
+  public HiveIntervalDayTime getIntervalDayTimeValue() {
+    return intervalDayTimeValue;
+  }
+
+  public String getTypeString() {
+    return getOutputType();
+  }
+
+  public void setOutputColumn(int outputColumn) {
+    this.outputColumn = outputColumn;
+  }
+
+  @Override
+  public VectorExpressionDescriptor.Descriptor getDescriptor() {
+    return (new VectorExpressionDescriptor.Builder()).build();
+  }
+
+  public DynamicValue getDynamicValue() {
+    return dynamicValue;
+  }
+
+  public void setDynamicValue(DynamicValue dynamicValue) {
+    this.dynamicValue = dynamicValue;
+  }
+
+  public TypeInfo getTypeInfo() {
+    return typeInfo;
+  }
+
+  public void setTypeInfo(TypeInfo typeInfo) {
+    this.typeInfo = typeInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
index 8fca8a1..218f306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
@@ -22,6 +22,8 @@ import java.io.Serializable;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
@@ -31,7 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 public abstract class VectorExpression implements Serializable {
   public enum Type {
     STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL,
-    INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, OTHER;
+    INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, BINARY, OTHER;
     private static Map<String, Type> types = ImmutableMap.<String, Type>builder()
         .put("string", STRING)
         .put("char", CHAR)
@@ -43,6 +45,7 @@ public abstract class VectorExpression implements Serializable {
         .put("decimal", DECIMAL)
         .put("interval_year_month", INTERVAL_YEAR_MONTH)
         .put("interval_day_time", INTERVAL_DAY_TIME)
+        .put("binary", BINARY)
         .build();
 
     public static Type getValue(String name) {
@@ -76,6 +79,14 @@ public abstract class VectorExpression implements Serializable {
    */
   public abstract void evaluate(VectorizedRowBatch batch);
 
+  public void init(Configuration conf) {
+    if (childExpressions != null) {
+      for (VectorExpression child : childExpressions) {
+        child.init(conf);
+      }
+    }
+  }
+
   /**
    * Returns the index of the output column in the array
    * of column vectors. If not applicable, -1 is returned.

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
new file mode 100644
index 0000000..188a87e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorInBloomFilterColDynamicValue extends VectorExpression {
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorInBloomFilterColDynamicValue.class);
+
+  protected int colNum;
+  protected DynamicValue bloomFilterDynamicValue;
+  protected transient boolean initialized = false;
+  protected transient BloomFilter bloomFilter;
+  protected transient BloomFilterCheck bfCheck;
+
+  public VectorInBloomFilterColDynamicValue(int colNum, DynamicValue bloomFilterDynamicValue) {
+    this.colNum = colNum;
+    this.bloomFilterDynamicValue = bloomFilterDynamicValue;
+  }
+
+  public VectorInBloomFilterColDynamicValue() {
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    bloomFilterDynamicValue.setConf(conf);
+
+    // Instantiate BloomFilterCheck based on input column type
+    VectorExpression.Type colType = this.getInputTypes()[0];
+    switch (colType) {
+    case LONG:
+    case DATE:
+      bfCheck = new LongBloomFilterCheck();
+      break;
+    case DOUBLE:
+      bfCheck = new DoubleBloomFilterCheck();
+      break;
+    case DECIMAL:
+      bfCheck = new DecimalBloomFilterCheck();
+      break;
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+    case BINARY:
+      bfCheck = new BytesBloomFilterCheck();
+      break;
+    case TIMESTAMP:
+      bfCheck = new TimestampBloomFilterCheck();
+      break;
+    default:
+      throw new IllegalStateException("Unsupported type " + colType);
+    }
+  }
+
+  private void initValue()  {
+    try {
+      Object val = bloomFilterDynamicValue.getValue();
+      if (val != null) {
+        BinaryObjectInspector boi = (BinaryObjectInspector) bloomFilterDynamicValue.getObjectInspector();
+        byte[] bytes = boi.getPrimitiveJavaObject(val);
+        bloomFilter = BloomFilter.deserialize(new ByteArrayInputStream(bytes));
+      } else {
+        bloomFilter = null;
+      }
+      initialized = true;
+    } catch (Exception err) {
+      throw new RuntimeException(err);
+    }
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+    if (childExpressions != null) {
+      super.evaluateChildren(batch);
+    }
+
+    if (!initialized) {
+      initValue();
+    }
+
+    ColumnVector inputColVector = batch.cols[colNum];
+    int[] sel = batch.selected;
+    boolean[] nullPos = inputColVector.isNull;
+    int n = batch.size;
+
+    // return immediately if batch is empty
+    if (n == 0) {
+      return;
+    }
+
+    // In case the dynamic value resolves to a null value
+    if (bloomFilter == null) {
+      batch.size = 0;
+    }
+
+    if (inputColVector.noNulls) {
+      if (inputColVector.isRepeating) {
+
+        // All must be selected otherwise size would be zero. Repeating property will not change.
+        if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+          //Entire batch is filtered out.
+          batch.size = 0;
+        }
+      } else if (batch.selectedInUse) {
+        int newSize = 0;
+        for(int j=0; j != n; j++) {
+          int i = sel[j];
+          if (bfCheck.checkValue(inputColVector, i)) {
+            sel[newSize++] = i;
+          }
+        }
+        batch.size = newSize;
+      } else {
+        int newSize = 0;
+        for(int i = 0; i != n; i++) {
+          if (bfCheck.checkValue(inputColVector, i)) {
+            sel[newSize++] = i;
+          }
+        }
+        if (newSize < n) {
+          batch.size = newSize;
+          batch.selectedInUse = true;
+        }
+      }
+    } else {
+      if (inputColVector.isRepeating) {
+
+        // All must be selected otherwise size would be zero. Repeating property will not change.
+        if (!nullPos[0]) {
+          if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+            //Entire batch is filtered out.
+            batch.size = 0;
+          }
+        } else {
+          batch.size = 0;
+        }
+      } else if (batch.selectedInUse) {
+        int newSize = 0;
+        for(int j=0; j != n; j++) {
+          int i = sel[j];
+          if (!nullPos[i]) {
+           if (bfCheck.checkValue(inputColVector, i)) {
+             sel[newSize++] = i;
+           }
+          }
+        }
+
+        //Change the selected vector
+        batch.size = newSize;
+      } else {
+        int newSize = 0;
+        for(int i = 0; i != n; i++) {
+          if (!nullPos[i]) {
+            if (bfCheck.checkValue(inputColVector, i)) {
+              sel[newSize++] = i;
+            }
+          }
+        }
+        if (newSize < n) {
+          batch.size = newSize;
+          batch.selectedInUse = true;
+        }
+      }
+    }
+  }
+
+  @Override
+  public int getOutputColumn() {
+    return -1;
+  }
+
+  @Override
+  public String getOutputType() {
+    return "boolean";
+  }
+
+  public int getColNum() {
+    return colNum;
+  }
+
+  public void setColNum(int colNum) {
+    this.colNum = colNum;
+  }
+
+  @Override
+  public Descriptor getDescriptor() {
+    VectorExpressionDescriptor.Builder b = new VectorExpressionDescriptor.Builder();
+    b.setMode(VectorExpressionDescriptor.Mode.FILTER)
+        .setNumArguments(2)
+        .setArgumentTypes(
+            VectorExpressionDescriptor.ArgumentType.ALL_FAMILY,
+            VectorExpressionDescriptor.ArgumentType.BINARY)
+        .setInputExpressionTypes(
+            VectorExpressionDescriptor.InputExpressionType.COLUMN,
+            VectorExpressionDescriptor.InputExpressionType.DYNAMICVALUE);
+    return b.build();
+  }
+
+  // Type-specific handling
+  abstract class BloomFilterCheck {
+    abstract public boolean checkValue(ColumnVector columnVector, int idx);
+  }
+
+  class BytesBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      BytesColumnVector col = (BytesColumnVector) columnVector;
+      return bloomFilter.testBytes(col.vector[idx], col.start[idx], col.length[idx]);
+    }
+  }
+
+  class LongBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      LongColumnVector col = (LongColumnVector) columnVector;
+      return bloomFilter.testLong(col.vector[idx]);
+    }
+  }
+
+  class DoubleBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      DoubleColumnVector col = (DoubleColumnVector) columnVector;
+      return bloomFilter.testDouble(col.vector[idx]);
+    }
+  }
+
+  class DecimalBloomFilterCheck extends BloomFilterCheck {
+    private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      DecimalColumnVector col = (DecimalColumnVector) columnVector;
+      int startIdx = col.vector[idx].toBytes(scratchBuffer);
+      return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+    }
+  }
+
+  class TimestampBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      TimestampColumnVector col = (TimestampColumnVector) columnVector;
+      return bloomFilter.testLong(col.time[idx]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
new file mode 100644
index 0000000..3ecb82e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorUDAFBloomFilter extends VectorAggregateExpression {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorUDAFBloomFilter.class);
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorExpression inputExpression;
+  private long expectedEntries = -1;
+  private ValueProcessor valueProcessor;
+  transient private int bitSetSize = -1;
+  transient private BytesWritable bw = new BytesWritable();
+  transient private ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+  /**
+   * class for storing the current aggregate value.
+   */
+  private static final class Aggregation implements AggregationBuffer {
+    private static final long serialVersionUID = 1L;
+
+    BloomFilter bf;
+
+    public Aggregation(long expectedEntries) {
+      bf = new BloomFilter(expectedEntries);
+    }
+
+    @Override
+    public int getVariableSize() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void reset() {
+      bf.reset();
+    }
+  }
+
+  public VectorUDAFBloomFilter(VectorExpression inputExpression) {
+    this();
+    this.inputExpression = inputExpression;
+
+    // Instantiate the ValueProcessor based on the input type
+    VectorExpressionDescriptor.ArgumentType inputType =
+        VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(inputExpression.getOutputType());
+    switch (inputType) {
+    case INT_FAMILY:
+    case DATE:
+      valueProcessor = new ValueProcessorLong();
+      break;
+    case FLOAT_FAMILY:
+      valueProcessor = new ValueProcessorDouble();
+      break;
+    case DECIMAL:
+      valueProcessor = new ValueProcessorDecimal();
+      break;
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+    case STRING_FAMILY:
+    case BINARY:
+      valueProcessor = new ValueProcessorBytes();
+      break;
+    case TIMESTAMP:
+      valueProcessor = new ValueProcessorTimestamp();
+      break;
+    default:
+      throw new IllegalStateException("Unsupported type " + inputType);
+    }
+  }
+
+  public VectorUDAFBloomFilter() {
+    super();
+  }
+
+  @Override
+  public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+    if (expectedEntries < 0) {
+      throw new IllegalStateException("expectedEntries not initialized");
+    }
+    return new Aggregation(expectedEntries);
+  }
+
+  @Override
+  public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+      throws HiveException {
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn =  batch.cols[this.inputExpression.getOutputColumn()];
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    Aggregation myagg = (Aggregation) agg;
+
+    if (inputColumn.isRepeating) {
+      if (inputColumn.noNulls) {
+        valueProcessor.processValue(myagg, inputColumn, 0);
+      }
+      return;
+    }
+
+    if (!batch.selectedInUse && inputColumn.noNulls) {
+      iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+    }
+    else if (!batch.selectedInUse) {
+      iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+    }
+    else if (inputColumn.noNulls){
+      iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+    else {
+      iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+  }
+
+  private void iterateNoSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+    for (int i=0; i< batchSize; ++i) {
+      valueProcessor.processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateNoSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i< batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      valueProcessor.processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      if (!inputColumn.isNull[i]) {
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+      VectorizedRowBatch batch) throws HiveException {
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+    if (inputColumn.noNulls) {
+      if (inputColumn.isRepeating) {
+        iterateNoNullsRepeatingWithAggregationSelection(
+          aggregationBufferSets, aggregateIndex,
+          inputColumn, batchSize);
+      } else {
+        if (batch.selectedInUse) {
+          iterateNoNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batch.selected, batchSize);
+        } else {
+          iterateNoNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    } else {
+      if (inputColumn.isRepeating) {
+        // All nulls, no-op for min/max
+      } else {
+        if (batch.selectedInUse) {
+          iterateHasNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize, batch.selected);
+        } else {
+          iterateHasNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    }
+  }
+
+  private void iterateNoNullsRepeatingWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+        aggregationBufferSets,
+        aggregrateIndex,
+        i);
+      valueProcessor.processValue(myagg, inputColumn, 0);
+    }
+  }
+
+  private void iterateNoNullsSelectionWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int[] selection,
+    int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      Aggregation myagg = getCurrentAggregationBuffer(
+        aggregationBufferSets,
+        aggregrateIndex,
+        i);
+      valueProcessor.processValue(myagg, inputColumn, row);
+    }
+  }
+
+  private void iterateNoNullsWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize) {
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+        aggregationBufferSets,
+        aggregrateIndex,
+        i);
+      valueProcessor.processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateHasNullsSelectionWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize,
+    int[] selection) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      if (!inputColumn.isNull[row]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateHasNullsWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private Aggregation getCurrentAggregationBuffer(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      int row) {
+    VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+    Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+    return myagg;
+  }
+
+  @Override
+  public void reset(AggregationBuffer agg) throws HiveException {
+    agg.reset();
+  }
+
+  @Override
+  public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+    try {
+      Aggregation bfAgg = (Aggregation) agg;
+      byteStream.reset();
+      BloomFilter.serialize(byteStream, bfAgg.bf);
+      byte[] bytes = byteStream.toByteArray();
+      bw.set(bytes, 0, bytes.length);
+      return bw;
+    } catch (IOException err) {
+      throw new HiveException("Error encountered while serializing bloomfilter", err);
+    }
+  }
+
+  @Override
+  public ObjectInspector getOutputObjectInspector() {
+    return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+  }
+
+  @Override
+  public int getAggregationBufferFixedSize() {
+    if (bitSetSize < 0) {
+      // Not pretty, but we need a way to get the size
+      try {
+        Aggregation agg = (Aggregation) getNewAggregationBuffer();
+        bitSetSize = agg.bf.getBitSet().length;
+      } catch (Exception e) {
+        throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+      }
+    }
+
+    // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int)
+    JavaDataModel model = JavaDataModel.get();
+    int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
+        model.memoryAlign());
+    return JavaDataModel.alignUp(
+        model.object() + bloomFilterSize + model.primitive1() + model.primitive1(),
+        model.memoryAlign());
+  }
+
+  @Override
+  public void init(AggregationDesc desc) throws HiveException {
+    GenericUDAFBloomFilterEvaluator udafBloomFilter =
+        (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+    expectedEntries = udafBloomFilter.getExpectedEntries();
+  }
+
+  public VectorExpression getInputExpression() {
+    return inputExpression;
+  }
+
+  public void setInputExpression(VectorExpression inputExpression) {
+    this.inputExpression = inputExpression;
+  }
+
+  public long getExpectedEntries() {
+    return expectedEntries;
+  }
+
+  public void setExpectedEntries(long expectedEntries) {
+    this.expectedEntries = expectedEntries;
+  }
+
+  // Type-specific handling done here
+  private static abstract class ValueProcessor {
+    abstract protected void processValue(Aggregation myagg, ColumnVector inputColumn, int index);
+  }
+
+  //
+  // Type-specific implementations
+  //
+
+  public static class ValueProcessorBytes extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+      myagg.bf.addBytes(inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+    }
+  }
+
+  public static class ValueProcessorLong extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      LongColumnVector inputColumn = (LongColumnVector) columnVector;
+      myagg.bf.addLong(inputColumn.vector[i]);
+    }
+  }
+
+  public static class ValueProcessorDouble extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      DoubleColumnVector inputColumn = (DoubleColumnVector) columnVector;
+      myagg.bf.addDouble(inputColumn.vector[i]);
+    }
+  }
+
+  public static class ValueProcessorDecimal extends ValueProcessor {
+    private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      DecimalColumnVector inputColumn = (DecimalColumnVector) columnVector;
+      int startIdx = inputColumn.vector[i].toBytes(scratchBuffer);
+      myagg.bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+    }
+  }
+
+  public static class ValueProcessorTimestamp extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      TimestampColumnVector inputColumn = (TimestampColumnVector) columnVector;
+      myagg.bf.addLong(inputColumn.time[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
new file mode 100644
index 0000000..ad190b7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.BloomFilter;
+
+public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorExpression inputExpression;
+  private long expectedEntries = -1;
+  transient private int aggBufferSize = -1;
+  transient private BytesWritable bw = new BytesWritable();
+
+  /**
+   * class for storing the current aggregate value.
+   */
+  private static final class Aggregation implements AggregationBuffer {
+    private static final long serialVersionUID = 1L;
+
+    byte[] bfBytes;
+
+    public Aggregation(long expectedEntries) {
+      try {
+        BloomFilter bf = new BloomFilter(expectedEntries);
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        BloomFilter.serialize(bytesOut, bf);
+        bfBytes = bytesOut.toByteArray();
+      } catch (Exception err) {
+        throw new IllegalArgumentException("Error creating aggregation buffer", err);
+      }
+    }
+
+    @Override
+    public int getVariableSize() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void reset() {
+      // Do not change the initial bytes which contain NumHashFunctions/NumBits!
+      Arrays.fill(bfBytes, BloomFilter.START_OF_SERIALIZED_LONGS, bfBytes.length, (byte) 0);
+    }
+  }
+
+  public VectorUDAFBloomFilterMerge(VectorExpression inputExpression) {
+    this();
+    this.inputExpression = inputExpression;
+  }
+
+  public VectorUDAFBloomFilterMerge() {
+    super();
+  }
+
+  @Override
+  public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+    if (expectedEntries < 0) {
+      throw new IllegalStateException("expectedEntries not initialized");
+    }
+    return new Aggregation(expectedEntries);
+  }
+
+  @Override
+  public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+      throws HiveException {
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn =  batch.cols[this.inputExpression.getOutputColumn()];
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    Aggregation myagg = (Aggregation) agg;
+
+    if (inputColumn.isRepeating) {
+      if (inputColumn.noNulls) {
+        processValue(myagg, inputColumn, 0);
+      }
+      return;
+    }
+
+    if (!batch.selectedInUse && inputColumn.noNulls) {
+      iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+    }
+    else if (!batch.selectedInUse) {
+      iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+    }
+    else if (inputColumn.noNulls){
+      iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+    else {
+      iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+  }
+
+  private void iterateNoSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+    for (int i=0; i< batchSize; ++i) {
+      processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateNoSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i< batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      if (!inputColumn.isNull[i]) {
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+      VectorizedRowBatch batch) throws HiveException {
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+    if (inputColumn.noNulls) {
+      if (inputColumn.isRepeating) {
+        iterateNoNullsRepeatingWithAggregationSelection(
+          aggregationBufferSets, aggregateIndex,
+          inputColumn, batchSize);
+      } else {
+        if (batch.selectedInUse) {
+          iterateNoNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batch.selected, batchSize);
+        } else {
+          iterateNoNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    } else {
+      if (inputColumn.isRepeating) {
+        // All nulls, no-op for min/max
+      } else {
+        if (batch.selectedInUse) {
+          iterateHasNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize, batch.selected);
+        } else {
+          iterateHasNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    }
+  }
+
+  private void iterateNoNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+      processValue(myagg, inputColumn, 0);
+    }
+  }
+
+  private void iterateNoNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int[] selection,
+      int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+      processValue(myagg, inputColumn, row);
+    }
+  }
+
+  private void iterateNoNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize) {
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+      processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateHasNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selection) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      if (!inputColumn.isNull[row]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets,
+            aggregrateIndex,
+            i);
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateHasNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets,
+            aggregrateIndex,
+            i);
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private Aggregation getCurrentAggregationBuffer(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      int row) {
+    VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+    Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+    return myagg;
+  }
+
+  @Override
+  public void reset(AggregationBuffer agg) throws HiveException {
+    agg.reset();
+  }
+
+  @Override
+  public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+    Aggregation bfAgg = (Aggregation) agg;
+    bw.set(bfAgg.bfBytes, 0, bfAgg.bfBytes.length);
+    return bw;
+  }
+
+  @Override
+  public ObjectInspector getOutputObjectInspector() {
+    return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+  }
+
+  @Override
+  public int getAggregationBufferFixedSize() {
+    if (aggBufferSize < 0) {
+      // Not pretty, but we need a way to get the size
+      try {
+        Aggregation agg = (Aggregation) getNewAggregationBuffer();
+        aggBufferSize = agg.bfBytes.length;
+      } catch (Exception e) {
+        throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+      }
+    }
+
+    return aggBufferSize;
+  }
+
+  @Override
+  public void init(AggregationDesc desc) throws HiveException {
+    GenericUDAFBloomFilterEvaluator udafBloomFilter =
+        (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+    expectedEntries = udafBloomFilter.getExpectedEntries();
+  }
+
+  void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+    // columnVector entry is byte array representing serialized BloomFilter.
+    // BloomFilter.mergeBloomFilterBytes() does a simple byte ORing
+    // which should be faster than deserialize/merge.
+    BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+    BloomFilter.mergeBloomFilterBytes(myagg.bfBytes, 0, myagg.bfBytes.length,
+        inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e3d9d7f..439950b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -335,6 +335,7 @@ public class Vectorizer implements PhysicalPlanResolver {
     supportedGenericUDFs.add(GenericUDFNvl.class);
     supportedGenericUDFs.add(GenericUDFElt.class);
     supportedGenericUDFs.add(GenericUDFInitCap.class);
+    supportedGenericUDFs.add(GenericUDFInBloomFilter.class);
 
     // For type casts
     supportedGenericUDFs.add(UDFToLong.class);
@@ -368,6 +369,7 @@ public class Vectorizer implements PhysicalPlanResolver {
     supportedAggregationUdfs.add("stddev");
     supportedAggregationUdfs.add("stddev_pop");
     supportedAggregationUdfs.add("stddev_samp");
+    supportedAggregationUdfs.add("bloom_filter");
   }
 
   private class VectorTaskColumnInfo {

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
index fb9a140..deb0f76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -72,6 +73,8 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
     // Bloom filter rest
     private ByteArrayOutputStream result = new ByteArrayOutputStream();
 
+    private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
     @Override
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
       super.init(m, parameters);
@@ -167,9 +170,10 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
           bf.addDouble(vDouble);
           break;
         case DECIMAL:
-          HiveDecimal vDecimal = ((HiveDecimalObjectInspector)inputOI).
-                  getPrimitiveJavaObject(parameters[0]);
-          bf.addString(vDecimal.toString());
+          HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector)inputOI).
+                  getPrimitiveWritableObject(parameters[0]);
+          int startIdx = vDecimal.toBytes(scratchBuffer);
+          bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
           break;
         case DATE:
           DateWritable vDate = ((DateObjectInspector)inputOI).

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
index 1b7de6c..3e6e069 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
@@ -22,8 +22,11 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorInBloomFilterColDynamicValue;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -41,6 +44,7 @@ import java.sql.Timestamp;
 /**
  * GenericUDF to lookup a value in BloomFilter
  */
+@VectorizedExpressions({VectorInBloomFilterColDynamicValue.class})
 public class GenericUDFInBloomFilter extends GenericUDF {
   private static final Logger LOG = LoggerFactory.getLogger(GenericUDFInBloomFilter.class);
 
@@ -48,6 +52,7 @@ public class GenericUDFInBloomFilter extends GenericUDF {
   private transient ObjectInspector bloomFilterObjectInspector;
   private transient BloomFilter bloomFilter;
   private transient boolean initializedBloomFilter;
+  private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
 
   @Override
   public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
@@ -133,9 +138,10 @@ public class GenericUDFInBloomFilter extends GenericUDF {
                 get(arguments[0].get());
         return bloomFilter.testDouble(vDouble);
       case DECIMAL:
-        HiveDecimal vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
-                getPrimitiveJavaObject(arguments[0].get());
-        return bloomFilter.testString(vDecimal.toString());
+        HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
+            getPrimitiveWritableObject(arguments[0].get());
+        int startIdx = vDecimal.toBytes(scratchBuffer);
+        return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
       case DATE:
         DateWritable vDate = ((DateObjectInspector) valObjectInspector).
                 getPrimitiveWritableObject(arguments[0].get());