You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/30 07:33:32 UTC

[1/3] hive git commit: HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)

Repository: hive
Updated Branches:
  refs/heads/master 79eb2243a -> 62ebd1abb


http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
----------------------------------------------------------------------
diff --git a/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java b/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
index 63c7050..e4ee93a 100644
--- a/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
+++ b/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
@@ -20,8 +20,12 @@ package org.apache.hive.common.util;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
 import java.util.Random;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -461,4 +465,125 @@ public class TestBloomFilter {
     assertEquals(true, bf.testString(v2));
     assertEquals(true, bf.testString(v3));
   }
+
+  @Test
+  public void testSerialize() throws Exception {
+    BloomFilter bf1 = new BloomFilter(10000);
+    String[] inputs = {
+      "bloo",
+      "bloom fil",
+      "bloom filter",
+      "cuckoo filter",
+    };
+
+    for (String val : inputs) {
+      bf1.addString(val);
+    }
+
+    // Serialize/deserialize
+    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+    BloomFilter.serialize(bytesOut, bf1);
+    ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytesOut.toByteArray());
+    BloomFilter bf2 = BloomFilter.deserialize(bytesIn);
+
+    for (String val : inputs) {
+      assertEquals("Testing bf1 with " + val, true, bf1.testString(val));
+      assertEquals("Testing bf2 with " + val, true, bf2.testString(val));
+    }
+  }
+
+  @Test
+  public void testMergeBloomFilterBytes() throws Exception {
+    BloomFilter bf1 = new BloomFilter(10000);
+    BloomFilter bf2 = new BloomFilter(10000);
+
+    String[] inputs1 = {
+      "bloo",
+      "bloom fil",
+      "bloom filter",
+      "cuckoo filter",
+    };
+
+    String[] inputs2 = {
+      "2_bloo",
+      "2_bloom fil",
+      "2_bloom filter",
+      "2_cuckoo filter",
+    };
+
+    for (String val : inputs1) {
+      bf1.addString(val);
+    }
+    for (String val : inputs2) {
+      bf2.addString(val);
+    }
+
+    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+    BloomFilter.serialize(bytesOut, bf1);
+    byte[] bf1Bytes = bytesOut.toByteArray();
+    bytesOut.reset();
+    BloomFilter.serialize(bytesOut, bf1);
+    byte[] bf2Bytes = bytesOut.toByteArray();
+
+    // Merge bytes
+    BloomFilter.mergeBloomFilterBytes(
+        bf1Bytes, 0, bf1Bytes.length,
+        bf2Bytes, 0, bf2Bytes.length);
+
+    // Deserialize and test
+    ByteArrayInputStream bytesIn = new ByteArrayInputStream(bf1Bytes, 0, bf1Bytes.length);
+    BloomFilter bfMerged = BloomFilter.deserialize(bytesIn);
+    // All values should pass test
+    for (String val : inputs1) {
+      bfMerged.addString(val);
+    }
+    for (String val : inputs2) {
+      bfMerged.addString(val);
+    }
+  }
+
+  @Test
+  public void testMergeBloomFilterBytesFailureCases() throws Exception {
+    BloomFilter bf1 = new BloomFilter(1000);
+    BloomFilter bf2 = new BloomFilter(200);
+    // Create bloom filter with same number of bits, but different # hash functions
+    ArrayList<Long> bits = new ArrayList<Long>();
+    for (int idx = 0; idx < bf1.getBitSet().length; ++idx) {
+      bits.add(0L);
+    }
+    BloomFilter bf3 = new BloomFilter(bits, bf1.getBitSize(), bf1.getNumHashFunctions() + 1);
+
+    // Serialize to bytes
+    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+    BloomFilter.serialize(bytesOut, bf1);
+    byte[] bf1Bytes = bytesOut.toByteArray();
+
+    bytesOut.reset();
+    BloomFilter.serialize(bytesOut, bf2);
+    byte[] bf2Bytes = bytesOut.toByteArray();
+
+    bytesOut.reset();
+    BloomFilter.serialize(bytesOut, bf3);
+    byte[] bf3Bytes = bytesOut.toByteArray();
+
+    try {
+      // this should fail
+      BloomFilter.mergeBloomFilterBytes(
+          bf1Bytes, 0, bf1Bytes.length,
+          bf2Bytes, 0, bf2Bytes.length);
+      Assert.fail("Expected exception not encountered");
+    } catch (IllegalArgumentException err) {
+      // expected
+    }
+
+    try {
+      // this should fail
+      BloomFilter.mergeBloomFilterBytes(
+          bf1Bytes, 0, bf1Bytes.length,
+          bf3Bytes, 0, bf3Bytes.length);
+      Assert.fail("Expected exception not encountered");
+    } catch (IllegalArgumentException err) {
+      // expected
+    }
+  }
 }


[2/3] hive git commit: HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)

Posted by jd...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
index 59cb31e..889f00a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
@@ -28,6 +28,7 @@ import junit.framework.Assert;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
 import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFSumLong;
@@ -218,4 +219,34 @@ public class TestVectorizer {
       Vectorizer vectorizer = new Vectorizer();
       Assert.assertTrue(vectorizer.validateMapWorkOperator(map, null, false));
   }
+
+  @Test
+  public void testExprNodeDynamicValue() {
+    ExprNodeDesc exprNode = new ExprNodeDynamicValueDesc(new DynamicValue("id1", TypeInfoFactory.stringTypeInfo));
+    Vectorizer v = new Vectorizer();
+    Assert.assertTrue(v.validateExprNodeDesc(exprNode, Mode.FILTER));
+    Assert.assertTrue(v.validateExprNodeDesc(exprNode, Mode.PROJECTION));
+  }
+
+  @Test
+  public void testExprNodeBetweenWithDynamicValue() {
+    ExprNodeDesc notBetween = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, Boolean.FALSE);
+    ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(String.class, "col1", "table", false);
+    ExprNodeDesc minExpr = new ExprNodeDynamicValueDesc(new DynamicValue("id1", TypeInfoFactory.stringTypeInfo));
+    ExprNodeDesc maxExpr = new ExprNodeDynamicValueDesc(new DynamicValue("id2", TypeInfoFactory.stringTypeInfo));
+
+    ExprNodeGenericFuncDesc betweenExpr = new ExprNodeGenericFuncDesc();
+    GenericUDF betweenUdf = new GenericUDFBetween();
+    betweenExpr.setTypeInfo(TypeInfoFactory.booleanTypeInfo);
+    betweenExpr.setGenericUDF(betweenUdf);
+    List<ExprNodeDesc> children1 = new ArrayList<ExprNodeDesc>(2);
+    children1.add(notBetween);
+    children1.add(colExpr);
+    children1.add(minExpr);
+    children1.add(maxExpr);
+    betweenExpr.setChildren(children1);
+
+    Vectorizer v = new Vectorizer();
+    Assert.assertTrue(v.validateExprNodeDesc(betweenExpr, Mode.FILTER));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/queries/clientpositive/explainuser_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/explainuser_3.q b/ql/src/test/queries/clientpositive/explainuser_3.q
index 282629e..9c6c9dc 100644
--- a/ql/src/test/queries/clientpositive/explainuser_3.q
+++ b/ql/src/test/queries/clientpositive/explainuser_3.q
@@ -13,6 +13,7 @@ set hive.vectorized.execution.enabled=true;
 
 CREATE TABLE acid_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true');
 insert into table acid_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10;
+analyze table acid_vectorized compute statistics for columns;
 explain select a, b from acid_vectorized order by a, b;
 
 explain select key, value

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q
new file mode 100644
index 0000000..e1eefff
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q
@@ -0,0 +1,43 @@
+set hive.compute.query.using.stats=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.tez.dynamic.semijoin.reduction=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+
+set hive.vectorized.adaptor.usage.mode=none;
+set hive.vectorized.execution.enabled=true;
+
+-- Create Tables
+create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src;
+create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100;
+
+-- single key (int)
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int);
+
+-- single key (string)
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str);
+
+-- keys are different type
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str);
+
+-- multiple tables
+EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int;
+select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int;
+
+-- multiple keys
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int);
+
+-- small table result is empty
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2');
+select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2');
+
+drop table dsrv_big;
+drop table dsrv_small;

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/mergejoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mergejoin.q.out b/ql/src/test/results/clientpositive/llap/mergejoin.q.out
index 4ec2a71..6114548 100644
--- a/ql/src/test/results/clientpositive/llap/mergejoin.q.out
+++ b/ql/src/test/results/clientpositive/llap/mergejoin.q.out
@@ -92,7 +92,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25)
@@ -321,7 +321,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 5 
             Map Operator Tree:
@@ -341,7 +341,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -378,7 +378,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -1434,7 +1434,7 @@ STAGE PLANS:
                           sort order: 
                           Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                           value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 5 
             Map Operator Tree:
@@ -1453,7 +1453,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -1490,7 +1490,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -1565,7 +1565,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 242 Data size: 24684 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 4 
             Map Operator Tree:
@@ -1594,7 +1594,7 @@ STAGE PLANS:
                           sort order: 
                           Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                           value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -1631,7 +1631,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 5 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=500)
@@ -1831,7 +1831,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 6 
             Map Operator Tree:
@@ -1851,7 +1851,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 7 
             Map Operator Tree:
@@ -1937,7 +1937,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 5 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -1949,7 +1949,7 @@ STAGE PLANS:
                   Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                   value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
         Reducer 8 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25)
@@ -2034,7 +2034,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 5 
             Map Operator Tree:
@@ -2054,7 +2054,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col1 (type: string)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -2091,7 +2091,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -2224,7 +2224,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 9 
             Map Operator Tree:
@@ -2244,7 +2244,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -2310,7 +2310,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 6 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=508)
@@ -2380,7 +2380,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 5 
             Map Operator Tree:
@@ -2400,7 +2400,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col1 (type: string)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -2437,7 +2437,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -2524,7 +2524,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 6 
             Map Operator Tree:
@@ -2544,7 +2544,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 7 
             Map Operator Tree:
@@ -2630,7 +2630,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 5 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -2642,7 +2642,7 @@ STAGE PLANS:
                   Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                   value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
         Reducer 8 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25)
@@ -2777,7 +2777,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 9 
             Map Operator Tree:
@@ -2797,7 +2797,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -2863,7 +2863,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 6 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=508)
@@ -2954,10 +2954,10 @@ STAGE PLANS:
                         key expressions: _col0 (type: int), _col1 (type: string)
                         sort order: ++
                         Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey0 (type: int)
@@ -3016,7 +3016,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 5 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index 90055a5..4fb3d12 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -597,7 +597,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: smallint), _col1 (type: smallint), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 5 
             Map Operator Tree:
@@ -618,7 +618,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: smallint)
                         Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col2 (type: string)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -660,7 +660,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=122880)
@@ -1089,7 +1089,7 @@ STAGE PLANS:
                             sort order: 
                             Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
                             value expressions: _col0 (type: smallint), _col1 (type: smallint), _col2 (type: binary)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 5 
             Map Operator Tree:
@@ -1110,7 +1110,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: smallint)
                         Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col2 (type: string)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -1152,7 +1152,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=245760)

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out b/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
index 9fbce7d..7de04a7 100644
--- a/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
@@ -151,7 +151,7 @@ STAGE PLANS:
                               sort order: 
                               Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                               value expressions: _col0 (type: bigint)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Map 3 
             Map Operator Tree:
@@ -171,7 +171,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col10 (type: binary)
                         Statistics: Num rows: 100 Data size: 29638 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float), _col5 (type: double), _col6 (type: boolean), _col7 (type: string), _col8 (type: timestamp), _col9 (type: decimal(4,2))
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: vectorized, llap

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
index 3d087b3..729a84e 100644
--- a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
@@ -2958,7 +2958,7 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 1 Data size: 172 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
@@ -3024,7 +3024,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 5 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=1)

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out
new file mode 100644
index 0000000..29f2391
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out
@@ -0,0 +1,932 @@
+PREHOOK: query: create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dsrv_big
+POSTHOOK: query: create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dsrv_big
+POSTHOOK: Lineage: dsrv_big.key_int EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_big.key_str SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_big.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dsrv_small
+POSTHOOK: query: create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dsrv_small
+POSTHOOK: Lineage: dsrv_small.key_int EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_small.key_str SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_small.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Reducer 5 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+                  Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_int (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  filterExpr: key_int is not null (type: boolean)
+                  Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key_int is not null (type: boolean)
+                    Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_int (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: int)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: int)
+                  1 _col0 (type: int)
+                Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Reducer 5 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  filterExpr: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+                  Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_str (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  filterExpr: key_str is not null (type: boolean)
+                  Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key_str is not null (type: boolean)
+                    Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_str (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Reducer 5 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  filterExpr: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+                  Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_str (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  filterExpr: key_str is not null (type: boolean)
+                  Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key_str is not null (type: boolean)
+                    Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_str (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+        Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_10_b_key_int_min) AND DynamicValue(RS_10_b_key_int_max) and key_int BETWEEN DynamicValue(RS_11_c_key_int_min) AND DynamicValue(RS_11_c_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_10_b_key_int_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_11_c_key_int_bloom_filter))) (type: boolean)
+                  Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_10_b_key_int_min) AND DynamicValue(RS_10_b_key_int_max) and key_int BETWEEN DynamicValue(RS_11_c_key_int_min) AND DynamicValue(RS_11_c_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_10_b_key_int_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_11_c_key_int_bloom_filter))) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_int (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  filterExpr: key_int is not null (type: boolean)
+                  Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key_int is not null (type: boolean)
+                    Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_int (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: int)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: c
+                  filterExpr: key_int is not null (type: boolean)
+                  Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key_int is not null (type: boolean)
+                    Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_int (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: int)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                     Inner Join 0 to 2
+                keys:
+                  0 _col0 (type: int)
+                  1 _col0 (type: int)
+                  2 _col0 (type: int)
+                Statistics: Num rows: 1100 Data size: 198000 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+        Reducer 7 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 6 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+        Reducer 6 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  filterExpr: (key_str is not null and key_int is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+                  Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key_str is not null and key_int is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_str (type: string), key_int (type: int)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: int)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: int)
+                        Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  filterExpr: (key_str is not null and key_int is not null) (type: boolean)
+                  Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key_str is not null and key_int is not null) (type: boolean)
+                    Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_str (type: string), key_int (type: int)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string), _col1 (type: int)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: string), _col1 (type: int)
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+                      Select Operator
+                        expressions: _col1 (type: int)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string), _col1 (type: int)
+                  1 _col0 (type: string), _col1 (type: int)
+                Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+        Reducer 6 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Reducer 5 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+                  Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_int (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  filterExpr: ((value) IN ('nonexistent1', 'nonexistent2') and key_int is not null) (type: boolean)
+                  Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((value) IN ('nonexistent1', 'nonexistent2') and key_int is not null) (type: boolean)
+                    Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key_int (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: int)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=29)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: int)
+                  1 _col0 (type: int)
+                Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=29)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table dsrv_big
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Output: default@dsrv_big
+POSTHOOK: query: drop table dsrv_big
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Output: default@dsrv_big
+PREHOOK: query: drop table dsrv_small
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dsrv_small
+PREHOOK: Output: default@dsrv_small
+POSTHOOK: query: drop table dsrv_small
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dsrv_small
+POSTHOOK: Output: default@dsrv_small

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/explainuser_3.q.out b/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
index fbf61ef..17c9ec3 100644
--- a/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
+++ b/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
@@ -16,6 +16,14 @@ POSTHOOK: Input: default@alltypesorc
 POSTHOOK: Output: default@acid_vectorized
 POSTHOOK: Lineage: acid_vectorized.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
 POSTHOOK: Lineage: acid_vectorized.b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+PREHOOK: query: analyze table acid_vectorized compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_vectorized
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table acid_vectorized compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_vectorized
+#### A masked pattern was here ####
 PREHOOK: query: explain select a, b from acid_vectorized order by a, b
 PREHOOK: type: QUERY
 POSTHOOK: query: explain select a, b from acid_vectorized order by a, b
@@ -31,14 +39,14 @@ Stage-0
     Stage-1
       Reducer 2 vectorized
       File Output Operator [FS_8]
-        Select Operator [SEL_7] (rows=16 width=106)
+        Select Operator [SEL_7] (rows=16 width=101)
           Output:["_col0","_col1"]
         <-Map 1 [SIMPLE_EDGE] vectorized
           SHUFFLE [RS_6]
-            Select Operator [SEL_5] (rows=16 width=106)
+            Select Operator [SEL_5] (rows=16 width=101)
               Output:["_col0","_col1"]
-              TableScan [TS_0] (rows=16 width=106)
-                default@acid_vectorized,acid_vectorized, ACID table,Tbl:COMPLETE,Col:NONE,Output:["a","b"]
+              TableScan [TS_0] (rows=16 width=101)
+                default@acid_vectorized,acid_vectorized, ACID table,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
 
 PREHOOK: query: explain select key, value
 FROM srcpart LATERAL VIEW explode(array(1,2,3)) myTable AS myCol
@@ -721,6 +729,7 @@ STAGE PLANS:
                                 output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                                 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
         Reducer 2 
+            Execution mode: vectorized
             Reduce Operator Tree:
               Group By Operator
                 aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out b/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
index 850278e..dead5a6 100644
--- a/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
+++ b/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
@@ -166,6 +166,7 @@ STAGE PLANS:
                         sort order: 
                         Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col0 (type: bigint)
+      Execution mode: vectorized
       Local Work:
         Map Reduce Local Work
       Reduce Operator Tree:

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
index d44bba8..e9f419d 100644
--- a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
+++ b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
@@ -292,6 +292,42 @@ public class BloomFilter {
     }
   }
 
+  // Given a byte array consisting of a serialized BloomFilter, gives the offset (from 0)
+  // for the start of the serialized long values that make up the bitset.
+  // NumHashFunctions (1 byte) + NumBits (4 bytes)
+  public static final int START_OF_SERIALIZED_LONGS = 5;
+
+  /**
+   * Merges BloomFilter bf2 into bf1.
+   * Assumes 2 BloomFilters with the same size/hash functions are serialized to byte arrays
+   * @param bf1Bytes
+   * @param bf1Start
+   * @param bf1Length
+   * @param bf2Bytes
+   * @param bf2Start
+   * @param bf2Length
+   */
+  public static void mergeBloomFilterBytes(
+      byte[] bf1Bytes, int bf1Start, int bf1Length,
+      byte[] bf2Bytes, int bf2Start, int bf2Length) {
+    if (bf1Length != bf2Length) {
+      throw new IllegalArgumentException("bf1Length " + bf1Length + " does not match bf2Length " + bf2Length);
+    }
+
+    // Validation on the bitset size/3 hash functions.
+    for (int idx = 0; idx < START_OF_SERIALIZED_LONGS; ++idx) {
+      if (bf1Bytes[bf1Start + idx] != bf2Bytes[bf2Start + idx]) {
+        throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does not match bf2");
+      }
+    }
+
+    // Just bitwise-OR the bits together - size/# functions should be the same,
+    // rest of the data is serialized long values for the bitset which are supposed to be bitwise-ORed.
+    for (int idx = START_OF_SERIALIZED_LONGS; idx < bf1Length; ++idx) {
+      bf1Bytes[bf1Start + idx] |= bf2Bytes[bf2Start + idx];
+    }
+  }
+
   /**
    * Bare metal bit set implementation. For performance reasons, this implementation does not check
    * for index bounds nor expand the bit set size if the specified index is greater than the size.


[3/3] hive git commit: HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)

Posted by jd...@apache.org.
HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/62ebd1ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/62ebd1ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/62ebd1ab

Branch: refs/heads/master
Commit: 62ebd1abb87d951024a371ba37a451f536a594e6
Parents: 79eb224
Author: Jason Dere <jd...@hortonworks.com>
Authored: Sun Jan 29 23:33:02 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Sun Jan 29 23:33:02 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ant/GenVectorCode.java   |  83 ++
 .../test/resources/testconfiguration.properties |   1 +
 .../FilterColumnBetweenDynamicValue.txt         | 101 ++
 .../FilterDecimalColumnBetween.txt              |  16 +
 .../FilterStringColumnBetween.txt               |   8 +-
 .../FilterTimestampColumnBetween.txt            |  16 +
 .../FilterTruncStringColumnBetween.txt          |   8 +-
 .../ql/exec/tez/DynamicValueRegistryTez.java    |  12 +-
 .../exec/vector/VectorExpressionDescriptor.java |   3 +-
 .../ql/exec/vector/VectorFilterOperator.java    |   2 +
 .../ql/exec/vector/VectorSelectOperator.java    |   1 +
 .../ql/exec/vector/VectorizationContext.java    |  77 +-
 .../DynamicValueVectorExpression.java           | 314 +++++++
 .../vector/expressions/VectorExpression.java    |  13 +-
 .../VectorInBloomFilterColDynamicValue.java     | 285 ++++++
 .../aggregates/VectorUDAFBloomFilter.java       | 474 ++++++++++
 .../aggregates/VectorUDAFBloomFilterMerge.java  | 365 ++++++++
 .../hive/ql/optimizer/physical/Vectorizer.java  |   2 +
 .../ql/udf/generic/GenericUDAFBloomFilter.java  |  10 +-
 .../ql/udf/generic/GenericUDFInBloomFilter.java |  12 +-
 .../ql/optimizer/physical/TestVectorizer.java   |  31 +
 .../test/queries/clientpositive/explainuser_3.q |   1 +
 .../vectorized_dynamic_semijoin_reduction.q     |  43 +
 .../results/clientpositive/llap/mergejoin.q.out |  66 +-
 .../results/clientpositive/llap/orc_llap.q.out  |  12 +-
 .../llap/vector_binary_join_groupby.q.out       |   4 +-
 .../vectorized_dynamic_partition_pruning.q.out  |   4 +-
 .../vectorized_dynamic_semijoin_reduction.q.out | 932 +++++++++++++++++++
 .../clientpositive/tez/explainuser_3.q.out      |  17 +-
 .../vector_binary_join_groupby.q.out            |   1 +
 .../apache/hive/common/util/BloomFilter.java    |  36 +
 .../hive/common/util/TestBloomFilter.java       | 125 +++
 32 files changed, 3000 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
----------------------------------------------------------------------
diff --git a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
index e9fe8fa..133ef0a 100644
--- a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
+++ b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
@@ -789,6 +789,15 @@ public class GenVectorCode extends Task {
       {"FilterTimestampColumnBetween", ""},
       {"FilterTimestampColumnBetween", "!"},
 
+      // This is for runtime min/max pushdown - don't need to do NOT BETWEEN
+      {"FilterColumnBetweenDynamicValue", "long", ""},
+      {"FilterColumnBetweenDynamicValue", "double", ""},
+      {"FilterColumnBetweenDynamicValue", "decimal", ""},
+      {"FilterColumnBetweenDynamicValue", "string", ""},
+      {"FilterColumnBetweenDynamicValue", "char", ""},
+      {"FilterColumnBetweenDynamicValue", "varchar", ""},
+      {"FilterColumnBetweenDynamicValue", "timestamp", ""},
+
       {"ColumnCompareColumn", "Equal", "long", "double", "=="},
       {"ColumnCompareColumn", "Equal", "double", "double", "=="},
       {"ColumnCompareColumn", "NotEqual", "long", "double", "!="},
@@ -1164,6 +1173,8 @@ public class GenVectorCode extends Task {
 
       } else if (tdesc[0].equals("FilterColumnBetween")) {
         generateFilterColumnBetween(tdesc);
+      } else if (tdesc[0].equals("FilterColumnBetweenDynamicValue")) {
+        generateFilterColumnBetweenDynamicValue(tdesc);
       } else if (tdesc[0].equals("ScalarArithmeticColumn") || tdesc[0].equals("ScalarDivideColumn")) {
         generateScalarArithmeticColumn(tdesc);
       } else if (tdesc[0].equals("FilterColumnCompareColumn")) {
@@ -1379,6 +1390,72 @@ public class GenVectorCode extends Task {
         className, templateString);
   }
 
+  private void generateFilterColumnBetweenDynamicValue(String[] tdesc) throws Exception {
+    String operandType = tdesc[1];
+    String optionalNot = tdesc[2];
+
+    String className = "Filter" + getCamelCaseType(operandType) + "Column" +
+      (optionalNot.equals("!") ? "Not" : "") + "BetweenDynamicValue";
+
+    String typeName = getCamelCaseType(operandType);
+    String defaultValue;
+    String vectorType;
+    String getPrimitiveMethod;
+    String getValueMethod;
+
+    if (operandType.equals("long")) {
+      defaultValue = "0";
+      vectorType = "long";
+      getPrimitiveMethod = "getLong";
+      getValueMethod = "";
+    } else if (operandType.equals("double")) {
+      defaultValue = "0";
+      vectorType = "double";
+      getPrimitiveMethod = "getDouble";
+      getValueMethod = "";
+    } else if (operandType.equals("decimal")) {
+      defaultValue = "null";
+      vectorType = "HiveDecimal";
+      getPrimitiveMethod = "getHiveDecimal";
+      getValueMethod = "";
+    } else if (operandType.equals("string")) {
+      defaultValue = "null";
+      vectorType = "byte[]";
+      getPrimitiveMethod = "getString";
+      getValueMethod = ".getBytes()";
+    } else if (operandType.equals("char")) {
+      defaultValue = "null";
+      vectorType = "byte[]";
+      getPrimitiveMethod = "getHiveChar";
+      getValueMethod = ".getStrippedValue().getBytes()";  // Does vectorization use stripped char values?
+    } else if (operandType.equals("varchar")) {
+      defaultValue = "null";
+      vectorType = "byte[]";
+      getPrimitiveMethod = "getHiveVarchar";
+      getValueMethod = ".getValue().getBytes()";
+    } else if (operandType.equals("timestamp")) {
+      defaultValue = "null";
+      vectorType = "Timestamp";
+      getPrimitiveMethod = "getTimestamp";
+      getValueMethod = "";
+    } else {
+      throw new IllegalArgumentException("Type " + operandType + " not supported");
+    }
+
+    // Read the template into a string, expand it, and write it.
+    File templateFile = new File(joinPath(this.expressionTemplateDirectory, tdesc[0] + ".txt"));
+    String templateString = readFile(templateFile);
+    templateString = templateString.replaceAll("<ClassName>", className);
+    templateString = templateString.replaceAll("<TypeName>", typeName);
+    templateString = templateString.replaceAll("<DefaultValue>", defaultValue);
+    templateString = templateString.replaceAll("<VectorType>", vectorType);
+    templateString = templateString.replaceAll("<GetPrimitiveMethod>", getPrimitiveMethod);
+    templateString = templateString.replaceAll("<GetValueMethod>", getValueMethod);
+
+    writeFile(templateFile.lastModified(), expressionOutputDirectory, expressionClassesDirectory,
+        className, templateString);
+  }
+
   private void generateColumnCompareColumn(String[] tdesc) throws Exception {
     String operatorName = tdesc[1];
     String operandType1 = tdesc[2];
@@ -3084,6 +3161,12 @@ public class GenVectorCode extends Task {
       return "Timestamp";
     } else if (type.equals("date")) {
       return "Date";
+    } else if (type.equals("string")) {
+      return "String";
+    } else if (type.equals("char")) {
+      return "Char";
+    } else if (type.equals("varchar")) {
+      return "VarChar";
     } else {
       return type;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e966959..8b3f589 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -611,6 +611,7 @@ minillaplocal.query.files=acid_globallimit.q,\
   vector_udf1.q,\
   vectorization_short_regress.q,\
   vectorized_dynamic_partition_pruning.q,\
+  vectorized_dynamic_semijoin_reduction.q,\
   vectorized_ptf.q,\
   windowing.q,\
   windowing_gby.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
new file mode 100644
index 0000000..97ab7aa
--- /dev/null
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.gen;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.Filter<TypeName>ColumnBetween;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Timestamp;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+
+public class <ClassName> extends Filter<TypeName>ColumnBetween {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(<ClassName>.class);
+
+  protected DynamicValue leftDynamicValue;
+  protected DynamicValue rightDynamicValue;
+  protected transient boolean initialized = false;
+  protected transient boolean isLeftOrRightNull = false;
+
+  public <ClassName>(int colNum, DynamicValue leftValue, DynamicValue rightValue) {
+    super(colNum, <DefaultValue>, <DefaultValue>);
+    this.leftDynamicValue = leftValue;
+    this.rightDynamicValue = rightValue;
+  }
+
+  public <ClassName>() {
+  }
+
+  public DynamicValue getLeftDynamicValue() {
+    return leftDynamicValue;
+  }
+
+  public void setLeftDynamicValue(DynamicValue leftValue) {
+    this.leftDynamicValue = leftValue;
+  }
+
+  public DynamicValue getRightDynamicValue() {
+    return rightDynamicValue;
+  }
+
+  public void getRightDynamicValue(DynamicValue rightValue) {
+    this.rightDynamicValue = rightValue;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    leftDynamicValue.setConf(conf);
+    rightDynamicValue.setConf(conf);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+    if (!initialized) {
+      Object lVal = leftDynamicValue.getValue();
+      Object rVal = rightDynamicValue.getValue();
+      if (lVal == null || rVal == null) {
+        isLeftOrRightNull = true;
+      } else {
+        <VectorType> min = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+            lVal, leftDynamicValue.getObjectInspector())<GetValueMethod>;
+        setLeftValue(min);
+
+        <VectorType> max = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+            rVal, rightDynamicValue.getObjectInspector())<GetValueMethod>;
+        setRightValue(max);
+      }
+      initialized = true;
+    }
+
+    // Special case for dynamic values - min/max can be null
+    if (isLeftOrRightNull) {
+      // Entire batch is filtered out
+      batch.size = 0;
+    }
+
+    super.evaluate(batch);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
index d68edfa..62d2254 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
@@ -154,6 +154,22 @@ public class <ClassName> extends VectorExpression {
     return "boolean";
   }
 
+  public HiveDecimal getLeftValue() {
+    return leftValue;
+  }
+
+  public void setLeftValue(HiveDecimal value) {
+    this.leftValue = value;
+  }
+
+  public HiveDecimal getRightValue() {
+    return rightValue;
+  }
+
+  public void setRightValue(HiveDecimal value) {
+    this.rightValue = value;
+  }
+
   @Override
   public VectorExpressionDescriptor.Descriptor getDescriptor() {
     return (new VectorExpressionDescriptor.Builder())

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
index e8049da..16d4aaf 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
@@ -161,19 +161,19 @@ public class <ClassName> extends VectorExpression {
     this.colNum = colNum;
   }
 
-  public byte[] getLeft() {
+  public byte[] getLeftValue() {
     return left;
   }
 
-  public void setLeft(byte[] value) {
+  public void setLeftValue(byte[] value) {
     this.left = value;
   }
   
-  public byte[] getRight() {
+  public byte[] getRightValue() {
     return right;
   }
 
-  public void setRight(byte[] value) {
+  public void setRightValue(byte[] value) {
     this.right = value;
   }
   

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
index 4298d79..806148f 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
@@ -153,6 +153,22 @@ public class <ClassName> extends VectorExpression {
     return "boolean";
   }
 
+  public Timestamp getLeftValue() {
+    return leftValue;
+  }
+
+  public void setLeftValue(Timestamp value) {
+    this.leftValue = value;
+  }
+
+  public Timestamp getRightValue() {
+    return rightValue;
+  }
+
+  public void setRightValue(Timestamp value) {
+    this.rightValue = value;
+  }
+
   @Override
   public VectorExpressionDescriptor.Descriptor getDescriptor() {
     return (new VectorExpressionDescriptor.Builder())

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
index 94a174d..d350dcb 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
@@ -163,19 +163,19 @@ public class <ClassName> extends VectorExpression {
     this.colNum = colNum;
   }
 
-  public byte[] getLeft() {
+  public byte[] getLeftValue() {
     return left;
   }
 
-  public void setLeft(byte[] value) {
+  public void setLeftValue(byte[] value) {
     this.left = value;
   }
   
-  public byte[] getRight() {
+  public byte[] getRightValue() {
     return right;
   }
 
-  public void setRight(byte[] value) {
+  public void setRightValue(byte[] value) {
     this.right = value;
   }
   

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
index 7bbedf6..b7687c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
@@ -122,9 +122,15 @@ public class DynamicValueRegistryTez implements DynamicValueRegistry {
           setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val);
         }
       }
-      // For now, expecting a single row (min/max, aggregated bloom filter)
-      if (rowCount != 1) {
-        throw new IllegalStateException("Expected 1 row from " + inputSourceName + ", got " + rowCount);
+      // For now, expecting a single row (min/max, aggregated bloom filter), or no rows
+      if (rowCount == 0) {
+        LOG.debug("No input rows from " + inputSourceName + ", filling dynamic values with nulls");
+        for (int colIdx = 0; colIdx < colExprEvaluators.size(); ++colIdx) {
+          ExprNodeEvaluator eval = colExprEvaluators.get(colIdx);
+          setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), null);
+        }
+      } else if (rowCount > 1) {
+        throw new IllegalStateException("Expected 0 or 1 rows from " + inputSourceName + ", got " + rowCount);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
index 217af3f..f4499d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
@@ -183,7 +183,8 @@ public class VectorExpressionDescriptor {
   public enum InputExpressionType {
     NONE(0),
     COLUMN(1),
-    SCALAR(2);
+    SCALAR(2),
+    DYNAMICVALUE(3);
 
     private final int value;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
index 261246b..2598445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
@@ -72,6 +72,8 @@ public class VectorFilterOperator extends FilterOperator {
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
+
+      conditionEvaluator.init(hconf);
     } catch (Throwable e) {
       throw new HiveException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index f7fec8f..bb382b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -113,6 +113,7 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
 
     projectedColumns = new int [vExpressions.length];
     for (int i = 0; i < projectedColumns.length; i++) {
+      vExpressions[i].init(hconf);
       projectedColumns[i] = vExpressions[i].getOutputColumn();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index c887757..484f615 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgTimestamp;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilterMerge;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
@@ -97,6 +99,7 @@ import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.udf.SettableUDF;
@@ -585,6 +588,8 @@ public class VectorizationContext {
     } else if (exprDesc instanceof ExprNodeConstantDesc) {
       ve = getConstantVectorExpression(((ExprNodeConstantDesc) exprDesc).getValue(), exprDesc.getTypeInfo(),
           mode);
+    } else if (exprDesc instanceof ExprNodeDynamicValueDesc) {
+      ve = getDynamicValueVectorExpression((ExprNodeDynamicValueDesc) exprDesc, mode);
     }
     if (ve == null) {
       throw new HiveException(
@@ -1094,6 +1099,21 @@ public class VectorizationContext {
     }
   }
 
+  private VectorExpression getDynamicValueVectorExpression(ExprNodeDynamicValueDesc dynamicValueExpr,
+      VectorExpressionDescriptor.Mode mode) throws HiveException {
+    String typeName =  dynamicValueExpr.getTypeInfo().getTypeName();
+    VectorExpressionDescriptor.ArgumentType vectorArgType = VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(typeName);
+    if (vectorArgType == VectorExpressionDescriptor.ArgumentType.NONE) {
+      throw new HiveException("No vector argument type for type name " + typeName);
+    }
+    int outCol = -1;
+    if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
+      outCol = ocm.allocateOutputColumn(dynamicValueExpr.getTypeInfo());
+    }
+
+    return new DynamicValueVectorExpression(outCol, dynamicValueExpr.getTypeInfo(), dynamicValueExpr.getDynamicValue());
+  }
+
   /**
    * Used as a fast path for operations that don't modify their input, like unary +
    * and casting boolean to long. IdentityExpression and its children are always
@@ -1181,6 +1201,8 @@ public class VectorizationContext {
         builder.setInputExpressionType(i, InputExpressionType.COLUMN);
       } else if (child instanceof ExprNodeConstantDesc) {
         builder.setInputExpressionType(i, InputExpressionType.SCALAR);
+      } else if (child instanceof ExprNodeDynamicValueDesc) {
+        builder.setInputExpressionType(i, InputExpressionType.DYNAMICVALUE);
       } else {
         throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
       }
@@ -1225,6 +1247,8 @@ public class VectorizationContext {
         } else if (child instanceof ExprNodeConstantDesc) {
           Object scalarValue = getVectorTypeScalarValue((ExprNodeConstantDesc) child);
           arguments[i] = (null == scalarValue) ? getConstantVectorExpression(null, child.getTypeInfo(), childrenMode) : scalarValue;
+        } else if (child instanceof ExprNodeDynamicValueDesc) {
+          arguments[i] = ((ExprNodeDynamicValueDesc) child).getDynamicValue();
         } else {
           throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
         }
@@ -2092,8 +2116,13 @@ public class VectorizationContext {
       return null;
     }
 
+    boolean hasDynamicValues = false;
+
     // We don't currently support the BETWEEN ends being columns.  They must be scalars.
-    if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
+    if ((childExpr.get(2) instanceof ExprNodeDynamicValueDesc) &&
+        (childExpr.get(3) instanceof ExprNodeDynamicValueDesc)) {
+      hasDynamicValues = true;
+    } else if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
         !(childExpr.get(3) instanceof ExprNodeConstantDesc)) {
       return null;
     }
@@ -2138,35 +2167,51 @@ public class VectorizationContext {
     // determine class
     Class<?> cl = null;
     if (isIntFamily(colType) && !notKeywordPresent) {
-      cl = FilterLongColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterLongColumnBetweenDynamicValue.class :
+          FilterLongColumnBetween.class);
     } else if (isIntFamily(colType) && notKeywordPresent) {
       cl = FilterLongColumnNotBetween.class;
     } else if (isFloatFamily(colType) && !notKeywordPresent) {
-      cl = FilterDoubleColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterDoubleColumnBetweenDynamicValue.class :
+          FilterDoubleColumnBetween.class);
     } else if (isFloatFamily(colType) && notKeywordPresent) {
       cl = FilterDoubleColumnNotBetween.class;
     } else if (colType.equals("string") && !notKeywordPresent) {
-      cl = FilterStringColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterStringColumnBetweenDynamicValue.class :
+          FilterStringColumnBetween.class);
     } else if (colType.equals("string") && notKeywordPresent) {
       cl = FilterStringColumnNotBetween.class;
     } else if (varcharTypePattern.matcher(colType).matches() && !notKeywordPresent) {
-      cl = FilterVarCharColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterVarCharColumnBetweenDynamicValue.class :
+          FilterVarCharColumnBetween.class);
     } else if (varcharTypePattern.matcher(colType).matches() && notKeywordPresent) {
       cl = FilterVarCharColumnNotBetween.class;
     } else if (charTypePattern.matcher(colType).matches() && !notKeywordPresent) {
-      cl = FilterCharColumnBetween.class;
+      cl =  (hasDynamicValues ?
+          FilterCharColumnBetweenDynamicValue.class :
+          FilterCharColumnBetween.class);
     } else if (charTypePattern.matcher(colType).matches() && notKeywordPresent) {
       cl = FilterCharColumnNotBetween.class;
     } else if (colType.equals("timestamp") && !notKeywordPresent) {
-      cl = FilterTimestampColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterTimestampColumnBetweenDynamicValue.class :
+          FilterTimestampColumnBetween.class);
     } else if (colType.equals("timestamp") && notKeywordPresent) {
       cl = FilterTimestampColumnNotBetween.class;
     } else if (isDecimalFamily(colType) && !notKeywordPresent) {
-      cl = FilterDecimalColumnBetween.class;
+      cl = (hasDynamicValues ?
+          FilterDecimalColumnBetweenDynamicValue.class :
+          FilterDecimalColumnBetween.class);
     } else if (isDecimalFamily(colType) && notKeywordPresent) {
       cl = FilterDecimalColumnNotBetween.class;
     } else if (isDateFamily(colType) && !notKeywordPresent) {
-      cl = FilterLongColumnBetween.class;
+      cl =  (hasDynamicValues ?
+          FilterLongColumnBetweenDynamicValue.class :
+          FilterLongColumnBetween.class);
     } else if (isDateFamily(colType) && notKeywordPresent) {
       cl = FilterLongColumnNotBetween.class;
     }
@@ -2224,6 +2269,12 @@ public class VectorizationContext {
       } else if (child instanceof ExprNodeConstantDesc) {
         // this is a constant (or null)
         argDescs[i].setConstant((ExprNodeConstantDesc) child);
+      } else if (child instanceof ExprNodeDynamicValueDesc) {
+        VectorExpression e = getVectorExpression(child, VectorExpressionDescriptor.Mode.PROJECTION);
+        vectorExprs.add(e);
+        variableArgPositions.add(i);
+        exprResultColumnNums.add(e.getOutputColumn());
+        argDescs[i].setVariable(e.getOutputColumn());
       } else {
         throw new HiveException("Unable to vectorize custom UDF. Encountered unsupported expr desc : "
             + child);
@@ -2651,6 +2702,14 @@ public class VectorizationContext {
     add(new AggregateDefinition("stddev_samp", ArgumentType.FLOAT_FAMILY,                    Mode.PARTIAL1,     VectorUDAFStdSampDouble.class));
     add(new AggregateDefinition("stddev_samp", ArgumentType.DECIMAL,                         Mode.PARTIAL1,     VectorUDAFStdSampDecimal.class));
     add(new AggregateDefinition("stddev_samp", ArgumentType.TIMESTAMP,                       Mode.PARTIAL1,     VectorUDAFStdSampTimestamp.class));
+
+    // UDAFBloomFilter. Original data is one type, partial/final is another,
+    // so this requires 2 aggregation classes (partial1/complete), (partial2/final)
+    add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY,                     Mode.PARTIAL1,     VectorUDAFBloomFilter.class));
+    add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY,                     Mode.COMPLETE,     VectorUDAFBloomFilter.class));
+    add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY,                         Mode.PARTIAL2,     VectorUDAFBloomFilterMerge.class));
+    add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY,                         Mode.FINAL,        VectorUDAFBloomFilterMerge.class));
+
   }};
 
   public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
new file mode 100644
index 0000000..1a34118
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.sql.Timestamp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.ql.exec.vector.*;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Constant is represented as a vector with repeating values.
+ */
+public class DynamicValueVectorExpression extends VectorExpression {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicValueVectorExpression.class);
+
+  private static final long serialVersionUID = 1L;
+
+  DynamicValue dynamicValue;
+  TypeInfo typeInfo;
+  transient private boolean initialized = false;
+
+  private int outputColumn;
+  protected long longValue = 0;
+  private double doubleValue = 0;
+  private byte[] bytesValue = null;
+  private HiveDecimal decimalValue = null;
+  private Timestamp timestampValue = null;
+  private HiveIntervalDayTime intervalDayTimeValue = null;
+  private boolean isNullValue = false;
+
+  private ColumnVector.Type type;
+  private int bytesValueLength = 0;
+
+  public DynamicValueVectorExpression() {
+    super();
+  }
+
+  public DynamicValueVectorExpression(int outputColumn, TypeInfo typeInfo, DynamicValue dynamicValue) {
+    this();
+    this.outputColumn = outputColumn;
+    this.type = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
+    this.dynamicValue = dynamicValue;
+    this.typeInfo = typeInfo;
+  }
+
+  private void evaluateLong(VectorizedRowBatch vrg) {
+    LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn];
+    cv.isRepeating = true;
+    cv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      cv.vector[0] = longValue;
+      cv.isNull[0] = false;
+    } else {
+      cv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateDouble(VectorizedRowBatch vrg) {
+    DoubleColumnVector cv = (DoubleColumnVector) vrg.cols[outputColumn];
+    cv.isRepeating = true;
+    cv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      cv.vector[0] = doubleValue;
+      cv.isNull[0] = false;
+    } else {
+      cv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateBytes(VectorizedRowBatch vrg) {
+    BytesColumnVector cv = (BytesColumnVector) vrg.cols[outputColumn];
+    cv.isRepeating = true;
+    cv.noNulls = !isNullValue;
+    cv.initBuffer();
+    if (!isNullValue) {
+      cv.setVal(0, bytesValue, 0, bytesValueLength);
+      cv.isNull[0] = false;
+    } else {
+      cv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateDecimal(VectorizedRowBatch vrg) {
+    DecimalColumnVector dcv = (DecimalColumnVector) vrg.cols[outputColumn];
+    dcv.isRepeating = true;
+    dcv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      dcv.vector[0].set(decimalValue);
+      dcv.isNull[0] = false;
+    } else {
+      dcv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateTimestamp(VectorizedRowBatch vrg) {
+    TimestampColumnVector dcv = (TimestampColumnVector) vrg.cols[outputColumn];
+    dcv.isRepeating = true;
+    dcv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      dcv.set(0, timestampValue);
+      dcv.isNull[0] = false;
+    } else {
+      dcv.isNull[0] = true;
+    }
+  }
+
+  private void evaluateIntervalDayTime(VectorizedRowBatch vrg) {
+    IntervalDayTimeColumnVector dcv = (IntervalDayTimeColumnVector) vrg.cols[outputColumn];
+    dcv.isRepeating = true;
+    dcv.noNulls = !isNullValue;
+    if (!isNullValue) {
+      dcv.set(0, intervalDayTimeValue);
+      dcv.isNull[0] = false;
+    } else {
+      dcv.isNull[0] = true;
+    }
+  }
+
+  private void initValue() {
+    Object val = dynamicValue.getValue();
+
+    if (val == null) {
+      isNullValue = true;
+    } else {
+      PrimitiveObjectInspector poi = dynamicValue.getObjectInspector();
+      byte[] bytesVal;
+      switch (poi.getPrimitiveCategory()) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        longValue = PrimitiveObjectInspectorUtils.getLong(val, poi);
+        break;
+      case FLOAT:
+      case DOUBLE:
+        doubleValue = PrimitiveObjectInspectorUtils.getDouble(val, poi);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        bytesVal = PrimitiveObjectInspectorUtils.getString(val, poi).getBytes();
+        setBytesValue(bytesVal);
+        break;
+      case BINARY:
+        bytesVal = PrimitiveObjectInspectorUtils.getBinary(val, poi).copyBytes();
+        setBytesValue(bytesVal);
+        break;
+      case DECIMAL:
+        decimalValue = PrimitiveObjectInspectorUtils.getHiveDecimal(val, poi);
+        break;
+      case DATE:
+        longValue = DateWritable.dateToDays(PrimitiveObjectInspectorUtils.getDate(val, poi));
+      case TIMESTAMP:
+        timestampValue = PrimitiveObjectInspectorUtils.getTimestamp(val, poi);
+        break;
+      case INTERVAL_YEAR_MONTH:
+        longValue = PrimitiveObjectInspectorUtils.getHiveIntervalYearMonth(val, poi).getTotalMonths();
+        break;
+      case INTERVAL_DAY_TIME:
+        intervalDayTimeValue = PrimitiveObjectInspectorUtils.getHiveIntervalDayTime(val, poi);
+        break;
+      default:
+        throw new IllegalStateException("Unsupported type " + poi.getPrimitiveCategory());
+      }
+    }
+
+    initialized = true;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    dynamicValue.setConf(conf);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch vrg) {
+    if (!initialized) {
+      initValue();
+    }
+
+    switch (type) {
+    case LONG:
+      evaluateLong(vrg);
+      break;
+    case DOUBLE:
+      evaluateDouble(vrg);
+      break;
+    case BYTES:
+      evaluateBytes(vrg);
+      break;
+    case DECIMAL:
+      evaluateDecimal(vrg);
+      break;
+    case TIMESTAMP:
+      evaluateTimestamp(vrg);
+      break;
+    case INTERVAL_DAY_TIME:
+      evaluateIntervalDayTime(vrg);
+      break;
+    default:
+      throw new IllegalStateException("Unsupported type " + type);
+    }
+  }
+
+  @Override
+  public int getOutputColumn() {
+    return outputColumn;
+  }
+
+  public long getLongValue() {
+    return longValue;
+  }
+
+  public void setLongValue(long longValue) {
+    this.longValue = longValue;
+  }
+
+  public double getDoubleValue() {
+    return doubleValue;
+  }
+
+  public void setDoubleValue(double doubleValue) {
+    this.doubleValue = doubleValue;
+  }
+
+  public byte[] getBytesValue() {
+    return bytesValue;
+  }
+
+  public void setBytesValue(byte[] bytesValue) {
+    this.bytesValue = bytesValue.clone();
+    this.bytesValueLength = bytesValue.length;
+  }
+
+  public void setDecimalValue(HiveDecimal decimalValue) {
+    this.decimalValue = decimalValue;
+  }
+
+  public HiveDecimal getDecimalValue() {
+    return decimalValue;
+  }
+
+  public void setTimestampValue(Timestamp timestampValue) {
+    this.timestampValue = timestampValue;
+  }
+
+  public Timestamp getTimestampValue() {
+    return timestampValue;
+  }
+
+  public void setIntervalDayTimeValue(HiveIntervalDayTime intervalDayTimeValue) {
+    this.intervalDayTimeValue = intervalDayTimeValue;
+  }
+
+  public HiveIntervalDayTime getIntervalDayTimeValue() {
+    return intervalDayTimeValue;
+  }
+
+  public String getTypeString() {
+    return getOutputType();
+  }
+
+  public void setOutputColumn(int outputColumn) {
+    this.outputColumn = outputColumn;
+  }
+
+  @Override
+  public VectorExpressionDescriptor.Descriptor getDescriptor() {
+    return (new VectorExpressionDescriptor.Builder()).build();
+  }
+
+  public DynamicValue getDynamicValue() {
+    return dynamicValue;
+  }
+
+  public void setDynamicValue(DynamicValue dynamicValue) {
+    this.dynamicValue = dynamicValue;
+  }
+
+  public TypeInfo getTypeInfo() {
+    return typeInfo;
+  }
+
+  public void setTypeInfo(TypeInfo typeInfo) {
+    this.typeInfo = typeInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
index 8fca8a1..218f306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
@@ -22,6 +22,8 @@ import java.io.Serializable;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
@@ -31,7 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 public abstract class VectorExpression implements Serializable {
   public enum Type {
     STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL,
-    INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, OTHER;
+    INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, BINARY, OTHER;
     private static Map<String, Type> types = ImmutableMap.<String, Type>builder()
         .put("string", STRING)
         .put("char", CHAR)
@@ -43,6 +45,7 @@ public abstract class VectorExpression implements Serializable {
         .put("decimal", DECIMAL)
         .put("interval_year_month", INTERVAL_YEAR_MONTH)
         .put("interval_day_time", INTERVAL_DAY_TIME)
+        .put("binary", BINARY)
         .build();
 
     public static Type getValue(String name) {
@@ -76,6 +79,14 @@ public abstract class VectorExpression implements Serializable {
    */
   public abstract void evaluate(VectorizedRowBatch batch);
 
+  public void init(Configuration conf) {
+    if (childExpressions != null) {
+      for (VectorExpression child : childExpressions) {
+        child.init(conf);
+      }
+    }
+  }
+
   /**
    * Returns the index of the output column in the array
    * of column vectors. If not applicable, -1 is returned.

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
new file mode 100644
index 0000000..188a87e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorInBloomFilterColDynamicValue extends VectorExpression {
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorInBloomFilterColDynamicValue.class);
+
+  protected int colNum;
+  protected DynamicValue bloomFilterDynamicValue;
+  protected transient boolean initialized = false;
+  protected transient BloomFilter bloomFilter;
+  protected transient BloomFilterCheck bfCheck;
+
+  public VectorInBloomFilterColDynamicValue(int colNum, DynamicValue bloomFilterDynamicValue) {
+    this.colNum = colNum;
+    this.bloomFilterDynamicValue = bloomFilterDynamicValue;
+  }
+
+  public VectorInBloomFilterColDynamicValue() {
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    bloomFilterDynamicValue.setConf(conf);
+
+    // Instantiate BloomFilterCheck based on input column type
+    VectorExpression.Type colType = this.getInputTypes()[0];
+    switch (colType) {
+    case LONG:
+    case DATE:
+      bfCheck = new LongBloomFilterCheck();
+      break;
+    case DOUBLE:
+      bfCheck = new DoubleBloomFilterCheck();
+      break;
+    case DECIMAL:
+      bfCheck = new DecimalBloomFilterCheck();
+      break;
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+    case BINARY:
+      bfCheck = new BytesBloomFilterCheck();
+      break;
+    case TIMESTAMP:
+      bfCheck = new TimestampBloomFilterCheck();
+      break;
+    default:
+      throw new IllegalStateException("Unsupported type " + colType);
+    }
+  }
+
+  private void initValue()  {
+    try {
+      Object val = bloomFilterDynamicValue.getValue();
+      if (val != null) {
+        BinaryObjectInspector boi = (BinaryObjectInspector) bloomFilterDynamicValue.getObjectInspector();
+        byte[] bytes = boi.getPrimitiveJavaObject(val);
+        bloomFilter = BloomFilter.deserialize(new ByteArrayInputStream(bytes));
+      } else {
+        bloomFilter = null;
+      }
+      initialized = true;
+    } catch (Exception err) {
+      throw new RuntimeException(err);
+    }
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+    if (childExpressions != null) {
+      super.evaluateChildren(batch);
+    }
+
+    if (!initialized) {
+      initValue();
+    }
+
+    ColumnVector inputColVector = batch.cols[colNum];
+    int[] sel = batch.selected;
+    boolean[] nullPos = inputColVector.isNull;
+    int n = batch.size;
+
+    // return immediately if batch is empty
+    if (n == 0) {
+      return;
+    }
+
+    // In case the dynamic value resolves to a null value
+    if (bloomFilter == null) {
+      batch.size = 0;
+    }
+
+    if (inputColVector.noNulls) {
+      if (inputColVector.isRepeating) {
+
+        // All must be selected otherwise size would be zero. Repeating property will not change.
+        if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+          //Entire batch is filtered out.
+          batch.size = 0;
+        }
+      } else if (batch.selectedInUse) {
+        int newSize = 0;
+        for(int j=0; j != n; j++) {
+          int i = sel[j];
+          if (bfCheck.checkValue(inputColVector, i)) {
+            sel[newSize++] = i;
+          }
+        }
+        batch.size = newSize;
+      } else {
+        int newSize = 0;
+        for(int i = 0; i != n; i++) {
+          if (bfCheck.checkValue(inputColVector, i)) {
+            sel[newSize++] = i;
+          }
+        }
+        if (newSize < n) {
+          batch.size = newSize;
+          batch.selectedInUse = true;
+        }
+      }
+    } else {
+      if (inputColVector.isRepeating) {
+
+        // All must be selected otherwise size would be zero. Repeating property will not change.
+        if (!nullPos[0]) {
+          if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+            //Entire batch is filtered out.
+            batch.size = 0;
+          }
+        } else {
+          batch.size = 0;
+        }
+      } else if (batch.selectedInUse) {
+        int newSize = 0;
+        for(int j=0; j != n; j++) {
+          int i = sel[j];
+          if (!nullPos[i]) {
+           if (bfCheck.checkValue(inputColVector, i)) {
+             sel[newSize++] = i;
+           }
+          }
+        }
+
+        //Change the selected vector
+        batch.size = newSize;
+      } else {
+        int newSize = 0;
+        for(int i = 0; i != n; i++) {
+          if (!nullPos[i]) {
+            if (bfCheck.checkValue(inputColVector, i)) {
+              sel[newSize++] = i;
+            }
+          }
+        }
+        if (newSize < n) {
+          batch.size = newSize;
+          batch.selectedInUse = true;
+        }
+      }
+    }
+  }
+
+  @Override
+  public int getOutputColumn() {
+    return -1;
+  }
+
+  @Override
+  public String getOutputType() {
+    return "boolean";
+  }
+
+  public int getColNum() {
+    return colNum;
+  }
+
+  public void setColNum(int colNum) {
+    this.colNum = colNum;
+  }
+
+  @Override
+  public Descriptor getDescriptor() {
+    VectorExpressionDescriptor.Builder b = new VectorExpressionDescriptor.Builder();
+    b.setMode(VectorExpressionDescriptor.Mode.FILTER)
+        .setNumArguments(2)
+        .setArgumentTypes(
+            VectorExpressionDescriptor.ArgumentType.ALL_FAMILY,
+            VectorExpressionDescriptor.ArgumentType.BINARY)
+        .setInputExpressionTypes(
+            VectorExpressionDescriptor.InputExpressionType.COLUMN,
+            VectorExpressionDescriptor.InputExpressionType.DYNAMICVALUE);
+    return b.build();
+  }
+
+  // Type-specific handling
+  abstract class BloomFilterCheck {
+    abstract public boolean checkValue(ColumnVector columnVector, int idx);
+  }
+
+  class BytesBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      BytesColumnVector col = (BytesColumnVector) columnVector;
+      return bloomFilter.testBytes(col.vector[idx], col.start[idx], col.length[idx]);
+    }
+  }
+
+  class LongBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      LongColumnVector col = (LongColumnVector) columnVector;
+      return bloomFilter.testLong(col.vector[idx]);
+    }
+  }
+
+  class DoubleBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      DoubleColumnVector col = (DoubleColumnVector) columnVector;
+      return bloomFilter.testDouble(col.vector[idx]);
+    }
+  }
+
+  class DecimalBloomFilterCheck extends BloomFilterCheck {
+    private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      DecimalColumnVector col = (DecimalColumnVector) columnVector;
+      int startIdx = col.vector[idx].toBytes(scratchBuffer);
+      return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+    }
+  }
+
+  class TimestampBloomFilterCheck extends BloomFilterCheck {
+    @Override
+    public boolean checkValue(ColumnVector columnVector, int idx) {
+      TimestampColumnVector col = (TimestampColumnVector) columnVector;
+      return bloomFilter.testLong(col.time[idx]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
new file mode 100644
index 0000000..3ecb82e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorUDAFBloomFilter extends VectorAggregateExpression {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorUDAFBloomFilter.class);
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorExpression inputExpression;
+  private long expectedEntries = -1;
+  private ValueProcessor valueProcessor;
+  transient private int bitSetSize = -1;
+  transient private BytesWritable bw = new BytesWritable();
+  transient private ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+  /**
+   * class for storing the current aggregate value.
+   */
+  private static final class Aggregation implements AggregationBuffer {
+    private static final long serialVersionUID = 1L;
+
+    BloomFilter bf;
+
+    public Aggregation(long expectedEntries) {
+      bf = new BloomFilter(expectedEntries);
+    }
+
+    @Override
+    public int getVariableSize() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void reset() {
+      bf.reset();
+    }
+  }
+
+  public VectorUDAFBloomFilter(VectorExpression inputExpression) {
+    this();
+    this.inputExpression = inputExpression;
+
+    // Instantiate the ValueProcessor based on the input type
+    VectorExpressionDescriptor.ArgumentType inputType =
+        VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(inputExpression.getOutputType());
+    switch (inputType) {
+    case INT_FAMILY:
+    case DATE:
+      valueProcessor = new ValueProcessorLong();
+      break;
+    case FLOAT_FAMILY:
+      valueProcessor = new ValueProcessorDouble();
+      break;
+    case DECIMAL:
+      valueProcessor = new ValueProcessorDecimal();
+      break;
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+    case STRING_FAMILY:
+    case BINARY:
+      valueProcessor = new ValueProcessorBytes();
+      break;
+    case TIMESTAMP:
+      valueProcessor = new ValueProcessorTimestamp();
+      break;
+    default:
+      throw new IllegalStateException("Unsupported type " + inputType);
+    }
+  }
+
+  public VectorUDAFBloomFilter() {
+    super();
+  }
+
+  @Override
+  public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+    if (expectedEntries < 0) {
+      throw new IllegalStateException("expectedEntries not initialized");
+    }
+    return new Aggregation(expectedEntries);
+  }
+
+  @Override
+  public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+      throws HiveException {
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn =  batch.cols[this.inputExpression.getOutputColumn()];
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    Aggregation myagg = (Aggregation) agg;
+
+    if (inputColumn.isRepeating) {
+      if (inputColumn.noNulls) {
+        valueProcessor.processValue(myagg, inputColumn, 0);
+      }
+      return;
+    }
+
+    if (!batch.selectedInUse && inputColumn.noNulls) {
+      iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+    }
+    else if (!batch.selectedInUse) {
+      iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+    }
+    else if (inputColumn.noNulls){
+      iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+    else {
+      iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+  }
+
+  private void iterateNoSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+    for (int i=0; i< batchSize; ++i) {
+      valueProcessor.processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateNoSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i< batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      valueProcessor.processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      if (!inputColumn.isNull[i]) {
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+      VectorizedRowBatch batch) throws HiveException {
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+    if (inputColumn.noNulls) {
+      if (inputColumn.isRepeating) {
+        iterateNoNullsRepeatingWithAggregationSelection(
+          aggregationBufferSets, aggregateIndex,
+          inputColumn, batchSize);
+      } else {
+        if (batch.selectedInUse) {
+          iterateNoNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batch.selected, batchSize);
+        } else {
+          iterateNoNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    } else {
+      if (inputColumn.isRepeating) {
+        // All nulls, no-op for min/max
+      } else {
+        if (batch.selectedInUse) {
+          iterateHasNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize, batch.selected);
+        } else {
+          iterateHasNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    }
+  }
+
+  private void iterateNoNullsRepeatingWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+        aggregationBufferSets,
+        aggregrateIndex,
+        i);
+      valueProcessor.processValue(myagg, inputColumn, 0);
+    }
+  }
+
+  private void iterateNoNullsSelectionWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int[] selection,
+    int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      Aggregation myagg = getCurrentAggregationBuffer(
+        aggregationBufferSets,
+        aggregrateIndex,
+        i);
+      valueProcessor.processValue(myagg, inputColumn, row);
+    }
+  }
+
+  private void iterateNoNullsWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize) {
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+        aggregationBufferSets,
+        aggregrateIndex,
+        i);
+      valueProcessor.processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateHasNullsSelectionWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize,
+    int[] selection) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      if (!inputColumn.isNull[row]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateHasNullsWithAggregationSelection(
+    VectorAggregationBufferRow[] aggregationBufferSets,
+    int aggregrateIndex,
+    ColumnVector inputColumn,
+    int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+        valueProcessor.processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private Aggregation getCurrentAggregationBuffer(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      int row) {
+    VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+    Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+    return myagg;
+  }
+
+  @Override
+  public void reset(AggregationBuffer agg) throws HiveException {
+    agg.reset();
+  }
+
+  @Override
+  public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+    try {
+      Aggregation bfAgg = (Aggregation) agg;
+      byteStream.reset();
+      BloomFilter.serialize(byteStream, bfAgg.bf);
+      byte[] bytes = byteStream.toByteArray();
+      bw.set(bytes, 0, bytes.length);
+      return bw;
+    } catch (IOException err) {
+      throw new HiveException("Error encountered while serializing bloomfilter", err);
+    }
+  }
+
+  @Override
+  public ObjectInspector getOutputObjectInspector() {
+    return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+  }
+
+  @Override
+  public int getAggregationBufferFixedSize() {
+    if (bitSetSize < 0) {
+      // Not pretty, but we need a way to get the size
+      try {
+        Aggregation agg = (Aggregation) getNewAggregationBuffer();
+        bitSetSize = agg.bf.getBitSet().length;
+      } catch (Exception e) {
+        throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+      }
+    }
+
+    // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int)
+    JavaDataModel model = JavaDataModel.get();
+    int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
+        model.memoryAlign());
+    return JavaDataModel.alignUp(
+        model.object() + bloomFilterSize + model.primitive1() + model.primitive1(),
+        model.memoryAlign());
+  }
+
+  @Override
+  public void init(AggregationDesc desc) throws HiveException {
+    GenericUDAFBloomFilterEvaluator udafBloomFilter =
+        (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+    expectedEntries = udafBloomFilter.getExpectedEntries();
+  }
+
+  public VectorExpression getInputExpression() {
+    return inputExpression;
+  }
+
+  public void setInputExpression(VectorExpression inputExpression) {
+    this.inputExpression = inputExpression;
+  }
+
+  public long getExpectedEntries() {
+    return expectedEntries;
+  }
+
+  public void setExpectedEntries(long expectedEntries) {
+    this.expectedEntries = expectedEntries;
+  }
+
+  // Type-specific handling done here
+  private static abstract class ValueProcessor {
+    abstract protected void processValue(Aggregation myagg, ColumnVector inputColumn, int index);
+  }
+
+  //
+  // Type-specific implementations
+  //
+
+  public static class ValueProcessorBytes extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+      myagg.bf.addBytes(inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+    }
+  }
+
+  public static class ValueProcessorLong extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      LongColumnVector inputColumn = (LongColumnVector) columnVector;
+      myagg.bf.addLong(inputColumn.vector[i]);
+    }
+  }
+
+  public static class ValueProcessorDouble extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      DoubleColumnVector inputColumn = (DoubleColumnVector) columnVector;
+      myagg.bf.addDouble(inputColumn.vector[i]);
+    }
+  }
+
+  public static class ValueProcessorDecimal extends ValueProcessor {
+    private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      DecimalColumnVector inputColumn = (DecimalColumnVector) columnVector;
+      int startIdx = inputColumn.vector[i].toBytes(scratchBuffer);
+      myagg.bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+    }
+  }
+
+  public static class ValueProcessorTimestamp extends ValueProcessor {
+    @Override
+    protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+      TimestampColumnVector inputColumn = (TimestampColumnVector) columnVector;
+      myagg.bf.addLong(inputColumn.time[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
new file mode 100644
index 0000000..ad190b7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.BloomFilter;
+
+public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorExpression inputExpression;
+  private long expectedEntries = -1;
+  transient private int aggBufferSize = -1;
+  transient private BytesWritable bw = new BytesWritable();
+
+  /**
+   * class for storing the current aggregate value.
+   */
+  private static final class Aggregation implements AggregationBuffer {
+    private static final long serialVersionUID = 1L;
+
+    byte[] bfBytes;
+
+    public Aggregation(long expectedEntries) {
+      try {
+        BloomFilter bf = new BloomFilter(expectedEntries);
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        BloomFilter.serialize(bytesOut, bf);
+        bfBytes = bytesOut.toByteArray();
+      } catch (Exception err) {
+        throw new IllegalArgumentException("Error creating aggregation buffer", err);
+      }
+    }
+
+    @Override
+    public int getVariableSize() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void reset() {
+      // Do not change the initial bytes which contain NumHashFunctions/NumBits!
+      Arrays.fill(bfBytes, BloomFilter.START_OF_SERIALIZED_LONGS, bfBytes.length, (byte) 0);
+    }
+  }
+
+  public VectorUDAFBloomFilterMerge(VectorExpression inputExpression) {
+    this();
+    this.inputExpression = inputExpression;
+  }
+
+  public VectorUDAFBloomFilterMerge() {
+    super();
+  }
+
+  @Override
+  public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+    if (expectedEntries < 0) {
+      throw new IllegalStateException("expectedEntries not initialized");
+    }
+    return new Aggregation(expectedEntries);
+  }
+
+  @Override
+  public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+      throws HiveException {
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn =  batch.cols[this.inputExpression.getOutputColumn()];
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    Aggregation myagg = (Aggregation) agg;
+
+    if (inputColumn.isRepeating) {
+      if (inputColumn.noNulls) {
+        processValue(myagg, inputColumn, 0);
+      }
+      return;
+    }
+
+    if (!batch.selectedInUse && inputColumn.noNulls) {
+      iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+    }
+    else if (!batch.selectedInUse) {
+      iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+    }
+    else if (inputColumn.noNulls){
+      iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+    else {
+      iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+    }
+  }
+
+  private void iterateNoSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+    for (int i=0; i< batchSize; ++i) {
+      processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateNoSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i< batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateSelectionNoNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateSelectionHasNulls(
+      Aggregation myagg,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selected) {
+
+    for (int j=0; j< batchSize; ++j) {
+      int i = selected[j];
+      if (!inputColumn.isNull[i]) {
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+      VectorizedRowBatch batch) throws HiveException {
+
+    int batchSize = batch.size;
+
+    if (batchSize == 0) {
+      return;
+    }
+
+    inputExpression.evaluate(batch);
+
+    ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+    if (inputColumn.noNulls) {
+      if (inputColumn.isRepeating) {
+        iterateNoNullsRepeatingWithAggregationSelection(
+          aggregationBufferSets, aggregateIndex,
+          inputColumn, batchSize);
+      } else {
+        if (batch.selectedInUse) {
+          iterateNoNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batch.selected, batchSize);
+        } else {
+          iterateNoNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    } else {
+      if (inputColumn.isRepeating) {
+        // All nulls, no-op for min/max
+      } else {
+        if (batch.selectedInUse) {
+          iterateHasNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize, batch.selected);
+        } else {
+          iterateHasNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            inputColumn, batchSize);
+        }
+      }
+    }
+  }
+
+  private void iterateNoNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+      processValue(myagg, inputColumn, 0);
+    }
+  }
+
+  private void iterateNoNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int[] selection,
+      int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+      processValue(myagg, inputColumn, row);
+    }
+  }
+
+  private void iterateNoNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize) {
+    for (int i=0; i < batchSize; ++i) {
+      Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets,
+          aggregrateIndex,
+          i);
+      processValue(myagg, inputColumn, i);
+    }
+  }
+
+  private void iterateHasNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize,
+      int[] selection) {
+
+    for (int i=0; i < batchSize; ++i) {
+      int row = selection[i];
+      if (!inputColumn.isNull[row]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets,
+            aggregrateIndex,
+            i);
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private void iterateHasNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      ColumnVector inputColumn,
+      int batchSize) {
+
+    for (int i=0; i < batchSize; ++i) {
+      if (!inputColumn.isNull[i]) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets,
+            aggregrateIndex,
+            i);
+        processValue(myagg, inputColumn, i);
+      }
+    }
+  }
+
+  private Aggregation getCurrentAggregationBuffer(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      int row) {
+    VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+    Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+    return myagg;
+  }
+
+  @Override
+  public void reset(AggregationBuffer agg) throws HiveException {
+    agg.reset();
+  }
+
+  @Override
+  public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+    Aggregation bfAgg = (Aggregation) agg;
+    bw.set(bfAgg.bfBytes, 0, bfAgg.bfBytes.length);
+    return bw;
+  }
+
+  @Override
+  public ObjectInspector getOutputObjectInspector() {
+    return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+  }
+
+  @Override
+  public int getAggregationBufferFixedSize() {
+    if (aggBufferSize < 0) {
+      // Not pretty, but we need a way to get the size
+      try {
+        Aggregation agg = (Aggregation) getNewAggregationBuffer();
+        aggBufferSize = agg.bfBytes.length;
+      } catch (Exception e) {
+        throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+      }
+    }
+
+    return aggBufferSize;
+  }
+
+  @Override
+  public void init(AggregationDesc desc) throws HiveException {
+    GenericUDAFBloomFilterEvaluator udafBloomFilter =
+        (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+    expectedEntries = udafBloomFilter.getExpectedEntries();
+  }
+
+  void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+    // columnVector entry is byte array representing serialized BloomFilter.
+    // BloomFilter.mergeBloomFilterBytes() does a simple byte ORing
+    // which should be faster than deserialize/merge.
+    BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+    BloomFilter.mergeBloomFilterBytes(myagg.bfBytes, 0, myagg.bfBytes.length,
+        inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e3d9d7f..439950b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -335,6 +335,7 @@ public class Vectorizer implements PhysicalPlanResolver {
     supportedGenericUDFs.add(GenericUDFNvl.class);
     supportedGenericUDFs.add(GenericUDFElt.class);
     supportedGenericUDFs.add(GenericUDFInitCap.class);
+    supportedGenericUDFs.add(GenericUDFInBloomFilter.class);
 
     // For type casts
     supportedGenericUDFs.add(UDFToLong.class);
@@ -368,6 +369,7 @@ public class Vectorizer implements PhysicalPlanResolver {
     supportedAggregationUdfs.add("stddev");
     supportedAggregationUdfs.add("stddev_pop");
     supportedAggregationUdfs.add("stddev_samp");
+    supportedAggregationUdfs.add("bloom_filter");
   }
 
   private class VectorTaskColumnInfo {

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
index fb9a140..deb0f76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -72,6 +73,8 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
     // Bloom filter rest
     private ByteArrayOutputStream result = new ByteArrayOutputStream();
 
+    private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
     @Override
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
       super.init(m, parameters);
@@ -167,9 +170,10 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
           bf.addDouble(vDouble);
           break;
         case DECIMAL:
-          HiveDecimal vDecimal = ((HiveDecimalObjectInspector)inputOI).
-                  getPrimitiveJavaObject(parameters[0]);
-          bf.addString(vDecimal.toString());
+          HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector)inputOI).
+                  getPrimitiveWritableObject(parameters[0]);
+          int startIdx = vDecimal.toBytes(scratchBuffer);
+          bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
           break;
         case DATE:
           DateWritable vDate = ((DateObjectInspector)inputOI).

http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
index 1b7de6c..3e6e069 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
@@ -22,8 +22,11 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorInBloomFilterColDynamicValue;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -41,6 +44,7 @@ import java.sql.Timestamp;
 /**
  * GenericUDF to lookup a value in BloomFilter
  */
+@VectorizedExpressions({VectorInBloomFilterColDynamicValue.class})
 public class GenericUDFInBloomFilter extends GenericUDF {
   private static final Logger LOG = LoggerFactory.getLogger(GenericUDFInBloomFilter.class);
 
@@ -48,6 +52,7 @@ public class GenericUDFInBloomFilter extends GenericUDF {
   private transient ObjectInspector bloomFilterObjectInspector;
   private transient BloomFilter bloomFilter;
   private transient boolean initializedBloomFilter;
+  private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
 
   @Override
   public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
@@ -133,9 +138,10 @@ public class GenericUDFInBloomFilter extends GenericUDF {
                 get(arguments[0].get());
         return bloomFilter.testDouble(vDouble);
       case DECIMAL:
-        HiveDecimal vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
-                getPrimitiveJavaObject(arguments[0].get());
-        return bloomFilter.testString(vDecimal.toString());
+        HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
+            getPrimitiveWritableObject(arguments[0].get());
+        int startIdx = vDecimal.toBytes(scratchBuffer);
+        return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
       case DATE:
         DateWritable vDate = ((DateObjectInspector) valObjectInspector).
                 getPrimitiveWritableObject(arguments[0].get());