You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/30 07:33:32 UTC
[1/3] hive git commit: HIVE-15698: Vectorization support for
min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)
Repository: hive
Updated Branches:
refs/heads/master 79eb2243a -> 62ebd1abb
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
----------------------------------------------------------------------
diff --git a/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java b/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
index 63c7050..e4ee93a 100644
--- a/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
+++ b/storage-api/src/test/org/apache/hive/common/util/TestBloomFilter.java
@@ -20,8 +20,12 @@ package org.apache.hive.common.util;
import static org.junit.Assert.assertEquals;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
import java.util.Random;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -461,4 +465,125 @@ public class TestBloomFilter {
assertEquals(true, bf.testString(v2));
assertEquals(true, bf.testString(v3));
}
+
+ @Test
+ public void testSerialize() throws Exception {
+ BloomFilter bf1 = new BloomFilter(10000);
+ String[] inputs = {
+ "bloo",
+ "bloom fil",
+ "bloom filter",
+ "cuckoo filter",
+ };
+
+ for (String val : inputs) {
+ bf1.addString(val);
+ }
+
+ // Serialize/deserialize
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ BloomFilter.serialize(bytesOut, bf1);
+ ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytesOut.toByteArray());
+ BloomFilter bf2 = BloomFilter.deserialize(bytesIn);
+
+ for (String val : inputs) {
+ assertEquals("Testing bf1 with " + val, true, bf1.testString(val));
+ assertEquals("Testing bf2 with " + val, true, bf2.testString(val));
+ }
+ }
+
+ @Test
+ public void testMergeBloomFilterBytes() throws Exception {
+ BloomFilter bf1 = new BloomFilter(10000);
+ BloomFilter bf2 = new BloomFilter(10000);
+
+ String[] inputs1 = {
+ "bloo",
+ "bloom fil",
+ "bloom filter",
+ "cuckoo filter",
+ };
+
+ String[] inputs2 = {
+ "2_bloo",
+ "2_bloom fil",
+ "2_bloom filter",
+ "2_cuckoo filter",
+ };
+
+ for (String val : inputs1) {
+ bf1.addString(val);
+ }
+ for (String val : inputs2) {
+ bf2.addString(val);
+ }
+
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ BloomFilter.serialize(bytesOut, bf1);
+ byte[] bf1Bytes = bytesOut.toByteArray();
+ bytesOut.reset();
+ BloomFilter.serialize(bytesOut, bf1);
+ byte[] bf2Bytes = bytesOut.toByteArray();
+
+ // Merge bytes
+ BloomFilter.mergeBloomFilterBytes(
+ bf1Bytes, 0, bf1Bytes.length,
+ bf2Bytes, 0, bf2Bytes.length);
+
+ // Deserialize and test
+ ByteArrayInputStream bytesIn = new ByteArrayInputStream(bf1Bytes, 0, bf1Bytes.length);
+ BloomFilter bfMerged = BloomFilter.deserialize(bytesIn);
+ // All values should pass test
+ for (String val : inputs1) {
+ bfMerged.addString(val);
+ }
+ for (String val : inputs2) {
+ bfMerged.addString(val);
+ }
+ }
+
+ @Test
+ public void testMergeBloomFilterBytesFailureCases() throws Exception {
+ BloomFilter bf1 = new BloomFilter(1000);
+ BloomFilter bf2 = new BloomFilter(200);
+ // Create bloom filter with same number of bits, but different # hash functions
+ ArrayList<Long> bits = new ArrayList<Long>();
+ for (int idx = 0; idx < bf1.getBitSet().length; ++idx) {
+ bits.add(0L);
+ }
+ BloomFilter bf3 = new BloomFilter(bits, bf1.getBitSize(), bf1.getNumHashFunctions() + 1);
+
+ // Serialize to bytes
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ BloomFilter.serialize(bytesOut, bf1);
+ byte[] bf1Bytes = bytesOut.toByteArray();
+
+ bytesOut.reset();
+ BloomFilter.serialize(bytesOut, bf2);
+ byte[] bf2Bytes = bytesOut.toByteArray();
+
+ bytesOut.reset();
+ BloomFilter.serialize(bytesOut, bf3);
+ byte[] bf3Bytes = bytesOut.toByteArray();
+
+ try {
+ // this should fail
+ BloomFilter.mergeBloomFilterBytes(
+ bf1Bytes, 0, bf1Bytes.length,
+ bf2Bytes, 0, bf2Bytes.length);
+ Assert.fail("Expected exception not encountered");
+ } catch (IllegalArgumentException err) {
+ // expected
+ }
+
+ try {
+ // this should fail
+ BloomFilter.mergeBloomFilterBytes(
+ bf1Bytes, 0, bf1Bytes.length,
+ bf3Bytes, 0, bf3Bytes.length);
+ Assert.fail("Expected exception not encountered");
+ } catch (IllegalArgumentException err) {
+ // expected
+ }
+ }
}
[2/3] hive git commit: HIVE-15698: Vectorization support for
min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)
Posted by jd...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
index 59cb31e..889f00a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java
@@ -28,6 +28,7 @@ import junit.framework.Assert;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFSumLong;
@@ -218,4 +219,34 @@ public class TestVectorizer {
Vectorizer vectorizer = new Vectorizer();
Assert.assertTrue(vectorizer.validateMapWorkOperator(map, null, false));
}
+
+ @Test
+ public void testExprNodeDynamicValue() {
+ ExprNodeDesc exprNode = new ExprNodeDynamicValueDesc(new DynamicValue("id1", TypeInfoFactory.stringTypeInfo));
+ Vectorizer v = new Vectorizer();
+ Assert.assertTrue(v.validateExprNodeDesc(exprNode, Mode.FILTER));
+ Assert.assertTrue(v.validateExprNodeDesc(exprNode, Mode.PROJECTION));
+ }
+
+ @Test
+ public void testExprNodeBetweenWithDynamicValue() {
+ ExprNodeDesc notBetween = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, Boolean.FALSE);
+ ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(String.class, "col1", "table", false);
+ ExprNodeDesc minExpr = new ExprNodeDynamicValueDesc(new DynamicValue("id1", TypeInfoFactory.stringTypeInfo));
+ ExprNodeDesc maxExpr = new ExprNodeDynamicValueDesc(new DynamicValue("id2", TypeInfoFactory.stringTypeInfo));
+
+ ExprNodeGenericFuncDesc betweenExpr = new ExprNodeGenericFuncDesc();
+ GenericUDF betweenUdf = new GenericUDFBetween();
+ betweenExpr.setTypeInfo(TypeInfoFactory.booleanTypeInfo);
+ betweenExpr.setGenericUDF(betweenUdf);
+ List<ExprNodeDesc> children1 = new ArrayList<ExprNodeDesc>(2);
+ children1.add(notBetween);
+ children1.add(colExpr);
+ children1.add(minExpr);
+ children1.add(maxExpr);
+ betweenExpr.setChildren(children1);
+
+ Vectorizer v = new Vectorizer();
+ Assert.assertTrue(v.validateExprNodeDesc(betweenExpr, Mode.FILTER));
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/queries/clientpositive/explainuser_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/explainuser_3.q b/ql/src/test/queries/clientpositive/explainuser_3.q
index 282629e..9c6c9dc 100644
--- a/ql/src/test/queries/clientpositive/explainuser_3.q
+++ b/ql/src/test/queries/clientpositive/explainuser_3.q
@@ -13,6 +13,7 @@ set hive.vectorized.execution.enabled=true;
CREATE TABLE acid_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true');
insert into table acid_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10;
+analyze table acid_vectorized compute statistics for columns;
explain select a, b from acid_vectorized order by a, b;
explain select key, value
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q
new file mode 100644
index 0000000..e1eefff
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q
@@ -0,0 +1,43 @@
+set hive.compute.query.using.stats=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.tez.dynamic.semijoin.reduction=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+
+set hive.vectorized.adaptor.usage.mode=none;
+set hive.vectorized.execution.enabled=true;
+
+-- Create Tables
+create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src;
+create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100;
+
+-- single key (int)
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int);
+
+-- single key (string)
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str);
+
+-- keys are different type
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str);
+
+-- multiple tables
+EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int;
+select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int;
+
+-- multiple keys
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int);
+select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int);
+
+-- small table result is empty
+EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2');
+select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2');
+
+drop table dsrv_big;
+drop table dsrv_small;
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/mergejoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mergejoin.q.out b/ql/src/test/results/clientpositive/llap/mergejoin.q.out
index 4ec2a71..6114548 100644
--- a/ql/src/test/results/clientpositive/llap/mergejoin.q.out
+++ b/ql/src/test/results/clientpositive/llap/mergejoin.q.out
@@ -92,7 +92,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 4
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25)
@@ -321,7 +321,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 5
Map Operator Tree:
@@ -341,7 +341,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -378,7 +378,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 4
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -1434,7 +1434,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 5
Map Operator Tree:
@@ -1453,7 +1453,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -1490,7 +1490,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 4
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -1565,7 +1565,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 242 Data size: 24684 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 4
Map Operator Tree:
@@ -1594,7 +1594,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -1631,7 +1631,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 5
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=500)
@@ -1831,7 +1831,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 6
Map Operator Tree:
@@ -1851,7 +1851,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 7
Map Operator Tree:
@@ -1937,7 +1937,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 5
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -1949,7 +1949,7 @@ STAGE PLANS:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
Reducer 8
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25)
@@ -2034,7 +2034,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 5
Map Operator Tree:
@@ -2054,7 +2054,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col1 (type: string)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -2091,7 +2091,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 4
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -2224,7 +2224,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 9
Map Operator Tree:
@@ -2244,7 +2244,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -2310,7 +2310,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 6
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=508)
@@ -2380,7 +2380,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 5
Map Operator Tree:
@@ -2400,7 +2400,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col1 (type: string)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -2437,7 +2437,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 4
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -2524,7 +2524,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 6
Map Operator Tree:
@@ -2544,7 +2544,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 7
Map Operator Tree:
@@ -2630,7 +2630,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 5
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
@@ -2642,7 +2642,7 @@ STAGE PLANS:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
Reducer 8
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25)
@@ -2777,7 +2777,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 9
Map Operator Tree:
@@ -2797,7 +2797,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -2863,7 +2863,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 6
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=508)
@@ -2954,10 +2954,10 @@ STAGE PLANS:
key expressions: _col0 (type: int), _col1 (type: string)
sort order: ++
Statistics: Num rows: 500 Data size: 51000 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int)
@@ -3016,7 +3016,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 5
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index 90055a5..4fb3d12 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -597,7 +597,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: smallint), _col1 (type: smallint), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 5
Map Operator Tree:
@@ -618,7 +618,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: smallint)
Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
value expressions: _col2 (type: string)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -660,7 +660,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 4
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=122880)
@@ -1089,7 +1089,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: smallint), _col1 (type: smallint), _col2 (type: binary)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 5
Map Operator Tree:
@@ -1110,7 +1110,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: smallint)
Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
value expressions: _col2 (type: string)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -1152,7 +1152,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 4
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=245760)
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out b/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
index 9fbce7d..7de04a7 100644
--- a/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_binary_join_groupby.q.out
@@ -151,7 +151,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Map 3
Map Operator Tree:
@@ -171,7 +171,7 @@ STAGE PLANS:
Map-reduce partition columns: _col10 (type: binary)
Statistics: Num rows: 100 Data size: 29638 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float), _col5 (type: double), _col6 (type: boolean), _col7 (type: string), _col8 (type: timestamp), _col9 (type: decimal(4,2))
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: vectorized, llap
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
index 3d087b3..729a84e 100644
--- a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_partition_pruning.q.out
@@ -2958,7 +2958,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 1 Data size: 172 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
@@ -3024,7 +3024,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 5
- Execution mode: llap
+ Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=1)
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out
new file mode 100644
index 0000000..29f2391
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out
@@ -0,0 +1,932 @@
+PREHOOK: query: create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dsrv_big
+POSTHOOK: query: create table dsrv_big stored as orc as select key as key_str, cast(key as int) as key_int, value from src
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dsrv_big
+POSTHOOK: Lineage: dsrv_big.key_int EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_big.key_str SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_big.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dsrv_small
+POSTHOOK: query: create table dsrv_small stored as orc as select distinct key as key_str, cast(key as int) as key_int, value from src where key < 100
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dsrv_small
+POSTHOOK: Lineage: dsrv_small.key_int EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_small.key_str SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dsrv_small.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Reducer 5 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a
+ filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_int (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ filterExpr: key_int is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key_int is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_int (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Reducer 5 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a
+ filterExpr: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_str (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ filterExpr: key_str is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key_str is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_str (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Reducer 5 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a
+ filterExpr: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key_str is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_str (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ filterExpr: key_str is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key_str is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_str (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_str)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+ Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a
+ filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_10_b_key_int_min) AND DynamicValue(RS_10_b_key_int_max) and key_int BETWEEN DynamicValue(RS_11_c_key_int_min) AND DynamicValue(RS_11_c_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_10_b_key_int_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_11_c_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_10_b_key_int_min) AND DynamicValue(RS_10_b_key_int_max) and key_int BETWEEN DynamicValue(RS_11_c_key_int_min) AND DynamicValue(RS_11_c_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_10_b_key_int_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_11_c_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_int (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ filterExpr: key_int is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key_int is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_int (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 6
+ Map Operator Tree:
+ TableScan
+ alias: c
+ filterExpr: key_int is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key_int is not null (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_int (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ Inner Join 0 to 2
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ 2 _col0 (type: int)
+ Statistics: Num rows: 1100 Data size: 198000 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+ Reducer 7
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a, dsrv_small b, dsrv_small c where a.key_int = b.key_int and a.key_int = c.key_int
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 6 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+ Reducer 6 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a
+ filterExpr: (key_str is not null and key_int is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key_str is not null and key_int is not null and key_str BETWEEN DynamicValue(RS_7_b_key_str_min) AND DynamicValue(RS_7_b_key_str_max) and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_str, DynamicValue(RS_7_b_key_str_bloom_filter)) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_str (type: string), key_int (type: int)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: int)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: int)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ filterExpr: (key_str is not null and key_int is not null) (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key_str is not null and key_int is not null) (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_str (type: string), key_int (type: int)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: int)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: int)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+ Select Operator
+ expressions: _col1 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=57)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string), _col1 (type: int)
+ 1 _col0 (type: string), _col1 (type: int)
+ Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+ Reducer 6
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=57)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and a.key_int = b.key_int)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+84
+PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Reducer 5 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a
+ filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_int (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ filterExpr: ((value) IN ('nonexistent1', 'nonexistent2') and key_int is not null) (type: boolean)
+ Statistics: Num rows: 57 Data size: 10146 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((value) IN ('nonexistent1', 'nonexistent2') and key_int is not null) (type: boolean)
+ Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key_int (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 29 Data size: 5162 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=29)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=29)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Input: default@dsrv_small
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table dsrv_big
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dsrv_big
+PREHOOK: Output: default@dsrv_big
+POSTHOOK: query: drop table dsrv_big
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dsrv_big
+POSTHOOK: Output: default@dsrv_big
+PREHOOK: query: drop table dsrv_small
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dsrv_small
+PREHOOK: Output: default@dsrv_small
+POSTHOOK: query: drop table dsrv_small
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dsrv_small
+POSTHOOK: Output: default@dsrv_small
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/explainuser_3.q.out b/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
index fbf61ef..17c9ec3 100644
--- a/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
+++ b/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
@@ -16,6 +16,14 @@ POSTHOOK: Input: default@alltypesorc
POSTHOOK: Output: default@acid_vectorized
POSTHOOK: Lineage: acid_vectorized.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
POSTHOOK: Lineage: acid_vectorized.b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+PREHOOK: query: analyze table acid_vectorized compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_vectorized
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table acid_vectorized compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_vectorized
+#### A masked pattern was here ####
PREHOOK: query: explain select a, b from acid_vectorized order by a, b
PREHOOK: type: QUERY
POSTHOOK: query: explain select a, b from acid_vectorized order by a, b
@@ -31,14 +39,14 @@ Stage-0
Stage-1
Reducer 2 vectorized
File Output Operator [FS_8]
- Select Operator [SEL_7] (rows=16 width=106)
+ Select Operator [SEL_7] (rows=16 width=101)
Output:["_col0","_col1"]
<-Map 1 [SIMPLE_EDGE] vectorized
SHUFFLE [RS_6]
- Select Operator [SEL_5] (rows=16 width=106)
+ Select Operator [SEL_5] (rows=16 width=101)
Output:["_col0","_col1"]
- TableScan [TS_0] (rows=16 width=106)
- default@acid_vectorized,acid_vectorized, ACID table,Tbl:COMPLETE,Col:NONE,Output:["a","b"]
+ TableScan [TS_0] (rows=16 width=101)
+ default@acid_vectorized,acid_vectorized, ACID table,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
PREHOOK: query: explain select key, value
FROM srcpart LATERAL VIEW explode(array(1,2,3)) myTable AS myCol
@@ -721,6 +729,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Reducer 2
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=242)
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out b/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
index 850278e..dead5a6 100644
--- a/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
+++ b/ql/src/test/results/clientpositive/vector_binary_join_groupby.q.out
@@ -166,6 +166,7 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
+ Execution mode: vectorized
Local Work:
Map Reduce Local Work
Reduce Operator Tree:
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
index d44bba8..e9f419d 100644
--- a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
+++ b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
@@ -292,6 +292,42 @@ public class BloomFilter {
}
}
+ // Given a byte array consisting of a serialized BloomFilter, gives the offset (from 0)
+ // for the start of the serialized long values that make up the bitset.
+ // NumHashFunctions (1 byte) + NumBits (4 bytes)
+ public static final int START_OF_SERIALIZED_LONGS = 5;
+
+ /**
+ * Merges BloomFilter bf2 into bf1.
+ * Assumes 2 BloomFilters with the same size/hash functions are serialized to byte arrays
+ * @param bf1Bytes
+ * @param bf1Start
+ * @param bf1Length
+ * @param bf2Bytes
+ * @param bf2Start
+ * @param bf2Length
+ */
+ public static void mergeBloomFilterBytes(
+ byte[] bf1Bytes, int bf1Start, int bf1Length,
+ byte[] bf2Bytes, int bf2Start, int bf2Length) {
+ if (bf1Length != bf2Length) {
+ throw new IllegalArgumentException("bf1Length " + bf1Length + " does not match bf2Length " + bf2Length);
+ }
+
+ // Validation on the bitset size/3 hash functions.
+ for (int idx = 0; idx < START_OF_SERIALIZED_LONGS; ++idx) {
+ if (bf1Bytes[bf1Start + idx] != bf2Bytes[bf2Start + idx]) {
+ throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does not match bf2");
+ }
+ }
+
+ // Just bitwise-OR the bits together - size/# functions should be the same,
+ // rest of the data is serialized long values for the bitset which are supposed to be bitwise-ORed.
+ for (int idx = START_OF_SERIALIZED_LONGS; idx < bf1Length; ++idx) {
+ bf1Bytes[bf1Start + idx] |= bf2Bytes[bf2Start + idx];
+ }
+ }
+
/**
* Bare metal bit set implementation. For performance reasons, this implementation does not check
* for index bounds nor expand the bit set size if the specified index is greater than the size.
[3/3] hive git commit: HIVE-15698: Vectorization support for
min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)
Posted by jd...@apache.org.
HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/62ebd1ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/62ebd1ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/62ebd1ab
Branch: refs/heads/master
Commit: 62ebd1abb87d951024a371ba37a451f536a594e6
Parents: 79eb224
Author: Jason Dere <jd...@hortonworks.com>
Authored: Sun Jan 29 23:33:02 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Sun Jan 29 23:33:02 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ant/GenVectorCode.java | 83 ++
.../test/resources/testconfiguration.properties | 1 +
.../FilterColumnBetweenDynamicValue.txt | 101 ++
.../FilterDecimalColumnBetween.txt | 16 +
.../FilterStringColumnBetween.txt | 8 +-
.../FilterTimestampColumnBetween.txt | 16 +
.../FilterTruncStringColumnBetween.txt | 8 +-
.../ql/exec/tez/DynamicValueRegistryTez.java | 12 +-
.../exec/vector/VectorExpressionDescriptor.java | 3 +-
.../ql/exec/vector/VectorFilterOperator.java | 2 +
.../ql/exec/vector/VectorSelectOperator.java | 1 +
.../ql/exec/vector/VectorizationContext.java | 77 +-
.../DynamicValueVectorExpression.java | 314 +++++++
.../vector/expressions/VectorExpression.java | 13 +-
.../VectorInBloomFilterColDynamicValue.java | 285 ++++++
.../aggregates/VectorUDAFBloomFilter.java | 474 ++++++++++
.../aggregates/VectorUDAFBloomFilterMerge.java | 365 ++++++++
.../hive/ql/optimizer/physical/Vectorizer.java | 2 +
.../ql/udf/generic/GenericUDAFBloomFilter.java | 10 +-
.../ql/udf/generic/GenericUDFInBloomFilter.java | 12 +-
.../ql/optimizer/physical/TestVectorizer.java | 31 +
.../test/queries/clientpositive/explainuser_3.q | 1 +
.../vectorized_dynamic_semijoin_reduction.q | 43 +
.../results/clientpositive/llap/mergejoin.q.out | 66 +-
.../results/clientpositive/llap/orc_llap.q.out | 12 +-
.../llap/vector_binary_join_groupby.q.out | 4 +-
.../vectorized_dynamic_partition_pruning.q.out | 4 +-
.../vectorized_dynamic_semijoin_reduction.q.out | 932 +++++++++++++++++++
.../clientpositive/tez/explainuser_3.q.out | 17 +-
.../vector_binary_join_groupby.q.out | 1 +
.../apache/hive/common/util/BloomFilter.java | 36 +
.../hive/common/util/TestBloomFilter.java | 125 +++
32 files changed, 3000 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
----------------------------------------------------------------------
diff --git a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
index e9fe8fa..133ef0a 100644
--- a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
+++ b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
@@ -789,6 +789,15 @@ public class GenVectorCode extends Task {
{"FilterTimestampColumnBetween", ""},
{"FilterTimestampColumnBetween", "!"},
+ // This is for runtime min/max pushdown - don't need to do NOT BETWEEN
+ {"FilterColumnBetweenDynamicValue", "long", ""},
+ {"FilterColumnBetweenDynamicValue", "double", ""},
+ {"FilterColumnBetweenDynamicValue", "decimal", ""},
+ {"FilterColumnBetweenDynamicValue", "string", ""},
+ {"FilterColumnBetweenDynamicValue", "char", ""},
+ {"FilterColumnBetweenDynamicValue", "varchar", ""},
+ {"FilterColumnBetweenDynamicValue", "timestamp", ""},
+
{"ColumnCompareColumn", "Equal", "long", "double", "=="},
{"ColumnCompareColumn", "Equal", "double", "double", "=="},
{"ColumnCompareColumn", "NotEqual", "long", "double", "!="},
@@ -1164,6 +1173,8 @@ public class GenVectorCode extends Task {
} else if (tdesc[0].equals("FilterColumnBetween")) {
generateFilterColumnBetween(tdesc);
+ } else if (tdesc[0].equals("FilterColumnBetweenDynamicValue")) {
+ generateFilterColumnBetweenDynamicValue(tdesc);
} else if (tdesc[0].equals("ScalarArithmeticColumn") || tdesc[0].equals("ScalarDivideColumn")) {
generateScalarArithmeticColumn(tdesc);
} else if (tdesc[0].equals("FilterColumnCompareColumn")) {
@@ -1379,6 +1390,72 @@ public class GenVectorCode extends Task {
className, templateString);
}
+ private void generateFilterColumnBetweenDynamicValue(String[] tdesc) throws Exception {
+ String operandType = tdesc[1];
+ String optionalNot = tdesc[2];
+
+ String className = "Filter" + getCamelCaseType(operandType) + "Column" +
+ (optionalNot.equals("!") ? "Not" : "") + "BetweenDynamicValue";
+
+ String typeName = getCamelCaseType(operandType);
+ String defaultValue;
+ String vectorType;
+ String getPrimitiveMethod;
+ String getValueMethod;
+
+ if (operandType.equals("long")) {
+ defaultValue = "0";
+ vectorType = "long";
+ getPrimitiveMethod = "getLong";
+ getValueMethod = "";
+ } else if (operandType.equals("double")) {
+ defaultValue = "0";
+ vectorType = "double";
+ getPrimitiveMethod = "getDouble";
+ getValueMethod = "";
+ } else if (operandType.equals("decimal")) {
+ defaultValue = "null";
+ vectorType = "HiveDecimal";
+ getPrimitiveMethod = "getHiveDecimal";
+ getValueMethod = "";
+ } else if (operandType.equals("string")) {
+ defaultValue = "null";
+ vectorType = "byte[]";
+ getPrimitiveMethod = "getString";
+ getValueMethod = ".getBytes()";
+ } else if (operandType.equals("char")) {
+ defaultValue = "null";
+ vectorType = "byte[]";
+ getPrimitiveMethod = "getHiveChar";
+ getValueMethod = ".getStrippedValue().getBytes()"; // Does vectorization use stripped char values?
+ } else if (operandType.equals("varchar")) {
+ defaultValue = "null";
+ vectorType = "byte[]";
+ getPrimitiveMethod = "getHiveVarchar";
+ getValueMethod = ".getValue().getBytes()";
+ } else if (operandType.equals("timestamp")) {
+ defaultValue = "null";
+ vectorType = "Timestamp";
+ getPrimitiveMethod = "getTimestamp";
+ getValueMethod = "";
+ } else {
+ throw new IllegalArgumentException("Type " + operandType + " not supported");
+ }
+
+ // Read the template into a string, expand it, and write it.
+ File templateFile = new File(joinPath(this.expressionTemplateDirectory, tdesc[0] + ".txt"));
+ String templateString = readFile(templateFile);
+ templateString = templateString.replaceAll("<ClassName>", className);
+ templateString = templateString.replaceAll("<TypeName>", typeName);
+ templateString = templateString.replaceAll("<DefaultValue>", defaultValue);
+ templateString = templateString.replaceAll("<VectorType>", vectorType);
+ templateString = templateString.replaceAll("<GetPrimitiveMethod>", getPrimitiveMethod);
+ templateString = templateString.replaceAll("<GetValueMethod>", getValueMethod);
+
+ writeFile(templateFile.lastModified(), expressionOutputDirectory, expressionClassesDirectory,
+ className, templateString);
+ }
+
private void generateColumnCompareColumn(String[] tdesc) throws Exception {
String operatorName = tdesc[1];
String operandType1 = tdesc[2];
@@ -3084,6 +3161,12 @@ public class GenVectorCode extends Task {
return "Timestamp";
} else if (type.equals("date")) {
return "Date";
+ } else if (type.equals("string")) {
+ return "String";
+ } else if (type.equals("char")) {
+ return "Char";
+ } else if (type.equals("varchar")) {
+ return "VarChar";
} else {
return type;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e966959..8b3f589 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -611,6 +611,7 @@ minillaplocal.query.files=acid_globallimit.q,\
vector_udf1.q,\
vectorization_short_regress.q,\
vectorized_dynamic_partition_pruning.q,\
+ vectorized_dynamic_semijoin_reduction.q,\
vectorized_ptf.q,\
windowing.q,\
windowing_gby.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
new file mode 100644
index 0000000..97ab7aa
--- /dev/null
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.gen;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.Filter<TypeName>ColumnBetween;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Timestamp;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+
+public class <ClassName> extends Filter<TypeName>ColumnBetween {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(<ClassName>.class);
+
+ protected DynamicValue leftDynamicValue;
+ protected DynamicValue rightDynamicValue;
+ protected transient boolean initialized = false;
+ protected transient boolean isLeftOrRightNull = false;
+
+ public <ClassName>(int colNum, DynamicValue leftValue, DynamicValue rightValue) {
+ super(colNum, <DefaultValue>, <DefaultValue>);
+ this.leftDynamicValue = leftValue;
+ this.rightDynamicValue = rightValue;
+ }
+
+ public <ClassName>() {
+ }
+
+ public DynamicValue getLeftDynamicValue() {
+ return leftDynamicValue;
+ }
+
+ public void setLeftDynamicValue(DynamicValue leftValue) {
+ this.leftDynamicValue = leftValue;
+ }
+
+ public DynamicValue getRightDynamicValue() {
+ return rightDynamicValue;
+ }
+
+ public void getRightDynamicValue(DynamicValue rightValue) {
+ this.rightDynamicValue = rightValue;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ leftDynamicValue.setConf(conf);
+ rightDynamicValue.setConf(conf);
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch batch) {
+ if (!initialized) {
+ Object lVal = leftDynamicValue.getValue();
+ Object rVal = rightDynamicValue.getValue();
+ if (lVal == null || rVal == null) {
+ isLeftOrRightNull = true;
+ } else {
+ <VectorType> min = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+ lVal, leftDynamicValue.getObjectInspector())<GetValueMethod>;
+ setLeftValue(min);
+
+ <VectorType> max = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+ rVal, rightDynamicValue.getObjectInspector())<GetValueMethod>;
+ setRightValue(max);
+ }
+ initialized = true;
+ }
+
+ // Special case for dynamic values - min/max can be null
+ if (isLeftOrRightNull) {
+ // Entire batch is filtered out
+ batch.size = 0;
+ }
+
+ super.evaluate(batch);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
index d68edfa..62d2254 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
@@ -154,6 +154,22 @@ public class <ClassName> extends VectorExpression {
return "boolean";
}
+ public HiveDecimal getLeftValue() {
+ return leftValue;
+ }
+
+ public void setLeftValue(HiveDecimal value) {
+ this.leftValue = value;
+ }
+
+ public HiveDecimal getRightValue() {
+ return rightValue;
+ }
+
+ public void setRightValue(HiveDecimal value) {
+ this.rightValue = value;
+ }
+
@Override
public VectorExpressionDescriptor.Descriptor getDescriptor() {
return (new VectorExpressionDescriptor.Builder())
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
index e8049da..16d4aaf 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
@@ -161,19 +161,19 @@ public class <ClassName> extends VectorExpression {
this.colNum = colNum;
}
- public byte[] getLeft() {
+ public byte[] getLeftValue() {
return left;
}
- public void setLeft(byte[] value) {
+ public void setLeftValue(byte[] value) {
this.left = value;
}
- public byte[] getRight() {
+ public byte[] getRightValue() {
return right;
}
- public void setRight(byte[] value) {
+ public void setRightValue(byte[] value) {
this.right = value;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
index 4298d79..806148f 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
@@ -153,6 +153,22 @@ public class <ClassName> extends VectorExpression {
return "boolean";
}
+ public Timestamp getLeftValue() {
+ return leftValue;
+ }
+
+ public void setLeftValue(Timestamp value) {
+ this.leftValue = value;
+ }
+
+ public Timestamp getRightValue() {
+ return rightValue;
+ }
+
+ public void setRightValue(Timestamp value) {
+ this.rightValue = value;
+ }
+
@Override
public VectorExpressionDescriptor.Descriptor getDescriptor() {
return (new VectorExpressionDescriptor.Builder())
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
index 94a174d..d350dcb 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
@@ -163,19 +163,19 @@ public class <ClassName> extends VectorExpression {
this.colNum = colNum;
}
- public byte[] getLeft() {
+ public byte[] getLeftValue() {
return left;
}
- public void setLeft(byte[] value) {
+ public void setLeftValue(byte[] value) {
this.left = value;
}
- public byte[] getRight() {
+ public byte[] getRightValue() {
return right;
}
- public void setRight(byte[] value) {
+ public void setRightValue(byte[] value) {
this.right = value;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
index 7bbedf6..b7687c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
@@ -122,9 +122,15 @@ public class DynamicValueRegistryTez implements DynamicValueRegistry {
setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val);
}
}
- // For now, expecting a single row (min/max, aggregated bloom filter)
- if (rowCount != 1) {
- throw new IllegalStateException("Expected 1 row from " + inputSourceName + ", got " + rowCount);
+ // For now, expecting a single row (min/max, aggregated bloom filter), or no rows
+ if (rowCount == 0) {
+ LOG.debug("No input rows from " + inputSourceName + ", filling dynamic values with nulls");
+ for (int colIdx = 0; colIdx < colExprEvaluators.size(); ++colIdx) {
+ ExprNodeEvaluator eval = colExprEvaluators.get(colIdx);
+ setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), null);
+ }
+ } else if (rowCount > 1) {
+ throw new IllegalStateException("Expected 0 or 1 rows from " + inputSourceName + ", got " + rowCount);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
index 217af3f..f4499d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
@@ -183,7 +183,8 @@ public class VectorExpressionDescriptor {
public enum InputExpressionType {
NONE(0),
COLUMN(1),
- SCALAR(2);
+ SCALAR(2),
+ DYNAMICVALUE(3);
private final int value;
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
index 261246b..2598445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
@@ -72,6 +72,8 @@ public class VectorFilterOperator extends FilterOperator {
try {
heartbeatInterval = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVESENDHEARTBEAT);
+
+ conditionEvaluator.init(hconf);
} catch (Throwable e) {
throw new HiveException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index f7fec8f..bb382b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -113,6 +113,7 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
projectedColumns = new int [vExpressions.length];
for (int i = 0; i < projectedColumns.length; i++) {
+ vExpressions[i].init(hconf);
projectedColumns[i] = vExpressions[i].getOutputColumn();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index c887757..484f615 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgTimestamp;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilterMerge;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
@@ -97,6 +99,7 @@ import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.udf.SettableUDF;
@@ -585,6 +588,8 @@ public class VectorizationContext {
} else if (exprDesc instanceof ExprNodeConstantDesc) {
ve = getConstantVectorExpression(((ExprNodeConstantDesc) exprDesc).getValue(), exprDesc.getTypeInfo(),
mode);
+ } else if (exprDesc instanceof ExprNodeDynamicValueDesc) {
+ ve = getDynamicValueVectorExpression((ExprNodeDynamicValueDesc) exprDesc, mode);
}
if (ve == null) {
throw new HiveException(
@@ -1094,6 +1099,21 @@ public class VectorizationContext {
}
}
+ private VectorExpression getDynamicValueVectorExpression(ExprNodeDynamicValueDesc dynamicValueExpr,
+ VectorExpressionDescriptor.Mode mode) throws HiveException {
+ String typeName = dynamicValueExpr.getTypeInfo().getTypeName();
+ VectorExpressionDescriptor.ArgumentType vectorArgType = VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(typeName);
+ if (vectorArgType == VectorExpressionDescriptor.ArgumentType.NONE) {
+ throw new HiveException("No vector argument type for type name " + typeName);
+ }
+ int outCol = -1;
+ if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
+ outCol = ocm.allocateOutputColumn(dynamicValueExpr.getTypeInfo());
+ }
+
+ return new DynamicValueVectorExpression(outCol, dynamicValueExpr.getTypeInfo(), dynamicValueExpr.getDynamicValue());
+ }
+
/**
* Used as a fast path for operations that don't modify their input, like unary +
* and casting boolean to long. IdentityExpression and its children are always
@@ -1181,6 +1201,8 @@ public class VectorizationContext {
builder.setInputExpressionType(i, InputExpressionType.COLUMN);
} else if (child instanceof ExprNodeConstantDesc) {
builder.setInputExpressionType(i, InputExpressionType.SCALAR);
+ } else if (child instanceof ExprNodeDynamicValueDesc) {
+ builder.setInputExpressionType(i, InputExpressionType.DYNAMICVALUE);
} else {
throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
}
@@ -1225,6 +1247,8 @@ public class VectorizationContext {
} else if (child instanceof ExprNodeConstantDesc) {
Object scalarValue = getVectorTypeScalarValue((ExprNodeConstantDesc) child);
arguments[i] = (null == scalarValue) ? getConstantVectorExpression(null, child.getTypeInfo(), childrenMode) : scalarValue;
+ } else if (child instanceof ExprNodeDynamicValueDesc) {
+ arguments[i] = ((ExprNodeDynamicValueDesc) child).getDynamicValue();
} else {
throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
}
@@ -2092,8 +2116,13 @@ public class VectorizationContext {
return null;
}
+ boolean hasDynamicValues = false;
+
// We don't currently support the BETWEEN ends being columns. They must be scalars.
- if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
+ if ((childExpr.get(2) instanceof ExprNodeDynamicValueDesc) &&
+ (childExpr.get(3) instanceof ExprNodeDynamicValueDesc)) {
+ hasDynamicValues = true;
+ } else if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
!(childExpr.get(3) instanceof ExprNodeConstantDesc)) {
return null;
}
@@ -2138,35 +2167,51 @@ public class VectorizationContext {
// determine class
Class<?> cl = null;
if (isIntFamily(colType) && !notKeywordPresent) {
- cl = FilterLongColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterLongColumnBetweenDynamicValue.class :
+ FilterLongColumnBetween.class);
} else if (isIntFamily(colType) && notKeywordPresent) {
cl = FilterLongColumnNotBetween.class;
} else if (isFloatFamily(colType) && !notKeywordPresent) {
- cl = FilterDoubleColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterDoubleColumnBetweenDynamicValue.class :
+ FilterDoubleColumnBetween.class);
} else if (isFloatFamily(colType) && notKeywordPresent) {
cl = FilterDoubleColumnNotBetween.class;
} else if (colType.equals("string") && !notKeywordPresent) {
- cl = FilterStringColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterStringColumnBetweenDynamicValue.class :
+ FilterStringColumnBetween.class);
} else if (colType.equals("string") && notKeywordPresent) {
cl = FilterStringColumnNotBetween.class;
} else if (varcharTypePattern.matcher(colType).matches() && !notKeywordPresent) {
- cl = FilterVarCharColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterVarCharColumnBetweenDynamicValue.class :
+ FilterVarCharColumnBetween.class);
} else if (varcharTypePattern.matcher(colType).matches() && notKeywordPresent) {
cl = FilterVarCharColumnNotBetween.class;
} else if (charTypePattern.matcher(colType).matches() && !notKeywordPresent) {
- cl = FilterCharColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterCharColumnBetweenDynamicValue.class :
+ FilterCharColumnBetween.class);
} else if (charTypePattern.matcher(colType).matches() && notKeywordPresent) {
cl = FilterCharColumnNotBetween.class;
} else if (colType.equals("timestamp") && !notKeywordPresent) {
- cl = FilterTimestampColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterTimestampColumnBetweenDynamicValue.class :
+ FilterTimestampColumnBetween.class);
} else if (colType.equals("timestamp") && notKeywordPresent) {
cl = FilterTimestampColumnNotBetween.class;
} else if (isDecimalFamily(colType) && !notKeywordPresent) {
- cl = FilterDecimalColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterDecimalColumnBetweenDynamicValue.class :
+ FilterDecimalColumnBetween.class);
} else if (isDecimalFamily(colType) && notKeywordPresent) {
cl = FilterDecimalColumnNotBetween.class;
} else if (isDateFamily(colType) && !notKeywordPresent) {
- cl = FilterLongColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterLongColumnBetweenDynamicValue.class :
+ FilterLongColumnBetween.class);
} else if (isDateFamily(colType) && notKeywordPresent) {
cl = FilterLongColumnNotBetween.class;
}
@@ -2224,6 +2269,12 @@ public class VectorizationContext {
} else if (child instanceof ExprNodeConstantDesc) {
// this is a constant (or null)
argDescs[i].setConstant((ExprNodeConstantDesc) child);
+ } else if (child instanceof ExprNodeDynamicValueDesc) {
+ VectorExpression e = getVectorExpression(child, VectorExpressionDescriptor.Mode.PROJECTION);
+ vectorExprs.add(e);
+ variableArgPositions.add(i);
+ exprResultColumnNums.add(e.getOutputColumn());
+ argDescs[i].setVariable(e.getOutputColumn());
} else {
throw new HiveException("Unable to vectorize custom UDF. Encountered unsupported expr desc : "
+ child);
@@ -2651,6 +2702,14 @@ public class VectorizationContext {
add(new AggregateDefinition("stddev_samp", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFStdSampDouble.class));
add(new AggregateDefinition("stddev_samp", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFStdSampDecimal.class));
add(new AggregateDefinition("stddev_samp", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFStdSampTimestamp.class));
+
+ // UDAFBloomFilter. Original data is one type, partial/final is another,
+ // so this requires 2 aggregation classes (partial1/complete), (partial2/final)
+ add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY, Mode.PARTIAL1, VectorUDAFBloomFilter.class));
+ add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY, Mode.COMPLETE, VectorUDAFBloomFilter.class));
+ add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY, Mode.PARTIAL2, VectorUDAFBloomFilterMerge.class));
+ add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY, Mode.FINAL, VectorUDAFBloomFilterMerge.class));
+
}};
public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
new file mode 100644
index 0000000..1a34118
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.sql.Timestamp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.ql.exec.vector.*;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Constant is represented as a vector with repeating values.
+ */
+public class DynamicValueVectorExpression extends VectorExpression {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicValueVectorExpression.class);
+
+ private static final long serialVersionUID = 1L;
+
+ DynamicValue dynamicValue;
+ TypeInfo typeInfo;
+ transient private boolean initialized = false;
+
+ private int outputColumn;
+ protected long longValue = 0;
+ private double doubleValue = 0;
+ private byte[] bytesValue = null;
+ private HiveDecimal decimalValue = null;
+ private Timestamp timestampValue = null;
+ private HiveIntervalDayTime intervalDayTimeValue = null;
+ private boolean isNullValue = false;
+
+ private ColumnVector.Type type;
+ private int bytesValueLength = 0;
+
+ public DynamicValueVectorExpression() {
+ super();
+ }
+
+ public DynamicValueVectorExpression(int outputColumn, TypeInfo typeInfo, DynamicValue dynamicValue) {
+ this();
+ this.outputColumn = outputColumn;
+ this.type = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
+ this.dynamicValue = dynamicValue;
+ this.typeInfo = typeInfo;
+ }
+
+ private void evaluateLong(VectorizedRowBatch vrg) {
+ LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn];
+ cv.isRepeating = true;
+ cv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ cv.vector[0] = longValue;
+ cv.isNull[0] = false;
+ } else {
+ cv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateDouble(VectorizedRowBatch vrg) {
+ DoubleColumnVector cv = (DoubleColumnVector) vrg.cols[outputColumn];
+ cv.isRepeating = true;
+ cv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ cv.vector[0] = doubleValue;
+ cv.isNull[0] = false;
+ } else {
+ cv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateBytes(VectorizedRowBatch vrg) {
+ BytesColumnVector cv = (BytesColumnVector) vrg.cols[outputColumn];
+ cv.isRepeating = true;
+ cv.noNulls = !isNullValue;
+ cv.initBuffer();
+ if (!isNullValue) {
+ cv.setVal(0, bytesValue, 0, bytesValueLength);
+ cv.isNull[0] = false;
+ } else {
+ cv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateDecimal(VectorizedRowBatch vrg) {
+ DecimalColumnVector dcv = (DecimalColumnVector) vrg.cols[outputColumn];
+ dcv.isRepeating = true;
+ dcv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ dcv.vector[0].set(decimalValue);
+ dcv.isNull[0] = false;
+ } else {
+ dcv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateTimestamp(VectorizedRowBatch vrg) {
+ TimestampColumnVector dcv = (TimestampColumnVector) vrg.cols[outputColumn];
+ dcv.isRepeating = true;
+ dcv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ dcv.set(0, timestampValue);
+ dcv.isNull[0] = false;
+ } else {
+ dcv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateIntervalDayTime(VectorizedRowBatch vrg) {
+ IntervalDayTimeColumnVector dcv = (IntervalDayTimeColumnVector) vrg.cols[outputColumn];
+ dcv.isRepeating = true;
+ dcv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ dcv.set(0, intervalDayTimeValue);
+ dcv.isNull[0] = false;
+ } else {
+ dcv.isNull[0] = true;
+ }
+ }
+
+ private void initValue() {
+ Object val = dynamicValue.getValue();
+
+ if (val == null) {
+ isNullValue = true;
+ } else {
+ PrimitiveObjectInspector poi = dynamicValue.getObjectInspector();
+ byte[] bytesVal;
+ switch (poi.getPrimitiveCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ longValue = PrimitiveObjectInspectorUtils.getLong(val, poi);
+ break;
+ case FLOAT:
+ case DOUBLE:
+ doubleValue = PrimitiveObjectInspectorUtils.getDouble(val, poi);
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ bytesVal = PrimitiveObjectInspectorUtils.getString(val, poi).getBytes();
+ setBytesValue(bytesVal);
+ break;
+ case BINARY:
+ bytesVal = PrimitiveObjectInspectorUtils.getBinary(val, poi).copyBytes();
+ setBytesValue(bytesVal);
+ break;
+ case DECIMAL:
+ decimalValue = PrimitiveObjectInspectorUtils.getHiveDecimal(val, poi);
+ break;
+ case DATE:
+ longValue = DateWritable.dateToDays(PrimitiveObjectInspectorUtils.getDate(val, poi));
+ case TIMESTAMP:
+ timestampValue = PrimitiveObjectInspectorUtils.getTimestamp(val, poi);
+ break;
+ case INTERVAL_YEAR_MONTH:
+ longValue = PrimitiveObjectInspectorUtils.getHiveIntervalYearMonth(val, poi).getTotalMonths();
+ break;
+ case INTERVAL_DAY_TIME:
+ intervalDayTimeValue = PrimitiveObjectInspectorUtils.getHiveIntervalDayTime(val, poi);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + poi.getPrimitiveCategory());
+ }
+ }
+
+ initialized = true;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ dynamicValue.setConf(conf);
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch vrg) {
+ if (!initialized) {
+ initValue();
+ }
+
+ switch (type) {
+ case LONG:
+ evaluateLong(vrg);
+ break;
+ case DOUBLE:
+ evaluateDouble(vrg);
+ break;
+ case BYTES:
+ evaluateBytes(vrg);
+ break;
+ case DECIMAL:
+ evaluateDecimal(vrg);
+ break;
+ case TIMESTAMP:
+ evaluateTimestamp(vrg);
+ break;
+ case INTERVAL_DAY_TIME:
+ evaluateIntervalDayTime(vrg);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + type);
+ }
+ }
+
+ @Override
+ public int getOutputColumn() {
+ return outputColumn;
+ }
+
+ public long getLongValue() {
+ return longValue;
+ }
+
+ public void setLongValue(long longValue) {
+ this.longValue = longValue;
+ }
+
+ public double getDoubleValue() {
+ return doubleValue;
+ }
+
+ public void setDoubleValue(double doubleValue) {
+ this.doubleValue = doubleValue;
+ }
+
+ public byte[] getBytesValue() {
+ return bytesValue;
+ }
+
+ public void setBytesValue(byte[] bytesValue) {
+ this.bytesValue = bytesValue.clone();
+ this.bytesValueLength = bytesValue.length;
+ }
+
+ public void setDecimalValue(HiveDecimal decimalValue) {
+ this.decimalValue = decimalValue;
+ }
+
+ public HiveDecimal getDecimalValue() {
+ return decimalValue;
+ }
+
+ public void setTimestampValue(Timestamp timestampValue) {
+ this.timestampValue = timestampValue;
+ }
+
+ public Timestamp getTimestampValue() {
+ return timestampValue;
+ }
+
+ public void setIntervalDayTimeValue(HiveIntervalDayTime intervalDayTimeValue) {
+ this.intervalDayTimeValue = intervalDayTimeValue;
+ }
+
+ public HiveIntervalDayTime getIntervalDayTimeValue() {
+ return intervalDayTimeValue;
+ }
+
+ public String getTypeString() {
+ return getOutputType();
+ }
+
+ public void setOutputColumn(int outputColumn) {
+ this.outputColumn = outputColumn;
+ }
+
+ @Override
+ public VectorExpressionDescriptor.Descriptor getDescriptor() {
+ return (new VectorExpressionDescriptor.Builder()).build();
+ }
+
+ public DynamicValue getDynamicValue() {
+ return dynamicValue;
+ }
+
+ public void setDynamicValue(DynamicValue dynamicValue) {
+ this.dynamicValue = dynamicValue;
+ }
+
+ public TypeInfo getTypeInfo() {
+ return typeInfo;
+ }
+
+ public void setTypeInfo(TypeInfo typeInfo) {
+ this.typeInfo = typeInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
index 8fca8a1..218f306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
@@ -22,6 +22,8 @@ import java.io.Serializable;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -31,7 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
public abstract class VectorExpression implements Serializable {
public enum Type {
STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL,
- INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, OTHER;
+ INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, BINARY, OTHER;
private static Map<String, Type> types = ImmutableMap.<String, Type>builder()
.put("string", STRING)
.put("char", CHAR)
@@ -43,6 +45,7 @@ public abstract class VectorExpression implements Serializable {
.put("decimal", DECIMAL)
.put("interval_year_month", INTERVAL_YEAR_MONTH)
.put("interval_day_time", INTERVAL_DAY_TIME)
+ .put("binary", BINARY)
.build();
public static Type getValue(String name) {
@@ -76,6 +79,14 @@ public abstract class VectorExpression implements Serializable {
*/
public abstract void evaluate(VectorizedRowBatch batch);
+ public void init(Configuration conf) {
+ if (childExpressions != null) {
+ for (VectorExpression child : childExpressions) {
+ child.init(conf);
+ }
+ }
+ }
+
/**
* Returns the index of the output column in the array
* of column vectors. If not applicable, -1 is returned.
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
new file mode 100644
index 0000000..188a87e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorInBloomFilterColDynamicValue extends VectorExpression {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorInBloomFilterColDynamicValue.class);
+
+ protected int colNum;
+ protected DynamicValue bloomFilterDynamicValue;
+ protected transient boolean initialized = false;
+ protected transient BloomFilter bloomFilter;
+ protected transient BloomFilterCheck bfCheck;
+
+ public VectorInBloomFilterColDynamicValue(int colNum, DynamicValue bloomFilterDynamicValue) {
+ this.colNum = colNum;
+ this.bloomFilterDynamicValue = bloomFilterDynamicValue;
+ }
+
+ public VectorInBloomFilterColDynamicValue() {
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ bloomFilterDynamicValue.setConf(conf);
+
+ // Instantiate BloomFilterCheck based on input column type
+ VectorExpression.Type colType = this.getInputTypes()[0];
+ switch (colType) {
+ case LONG:
+ case DATE:
+ bfCheck = new LongBloomFilterCheck();
+ break;
+ case DOUBLE:
+ bfCheck = new DoubleBloomFilterCheck();
+ break;
+ case DECIMAL:
+ bfCheck = new DecimalBloomFilterCheck();
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ bfCheck = new BytesBloomFilterCheck();
+ break;
+ case TIMESTAMP:
+ bfCheck = new TimestampBloomFilterCheck();
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + colType);
+ }
+ }
+
+ private void initValue() {
+ try {
+ Object val = bloomFilterDynamicValue.getValue();
+ if (val != null) {
+ BinaryObjectInspector boi = (BinaryObjectInspector) bloomFilterDynamicValue.getObjectInspector();
+ byte[] bytes = boi.getPrimitiveJavaObject(val);
+ bloomFilter = BloomFilter.deserialize(new ByteArrayInputStream(bytes));
+ } else {
+ bloomFilter = null;
+ }
+ initialized = true;
+ } catch (Exception err) {
+ throw new RuntimeException(err);
+ }
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch batch) {
+ if (childExpressions != null) {
+ super.evaluateChildren(batch);
+ }
+
+ if (!initialized) {
+ initValue();
+ }
+
+ ColumnVector inputColVector = batch.cols[colNum];
+ int[] sel = batch.selected;
+ boolean[] nullPos = inputColVector.isNull;
+ int n = batch.size;
+
+ // return immediately if batch is empty
+ if (n == 0) {
+ return;
+ }
+
+ // In case the dynamic value resolves to a null value
+ if (bloomFilter == null) {
+ batch.size = 0;
+ }
+
+ if (inputColVector.noNulls) {
+ if (inputColVector.isRepeating) {
+
+ // All must be selected otherwise size would be zero. Repeating property will not change.
+ if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+ //Entire batch is filtered out.
+ batch.size = 0;
+ }
+ } else if (batch.selectedInUse) {
+ int newSize = 0;
+ for(int j=0; j != n; j++) {
+ int i = sel[j];
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ batch.size = newSize;
+ } else {
+ int newSize = 0;
+ for(int i = 0; i != n; i++) {
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ if (newSize < n) {
+ batch.size = newSize;
+ batch.selectedInUse = true;
+ }
+ }
+ } else {
+ if (inputColVector.isRepeating) {
+
+ // All must be selected otherwise size would be zero. Repeating property will not change.
+ if (!nullPos[0]) {
+ if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+ //Entire batch is filtered out.
+ batch.size = 0;
+ }
+ } else {
+ batch.size = 0;
+ }
+ } else if (batch.selectedInUse) {
+ int newSize = 0;
+ for(int j=0; j != n; j++) {
+ int i = sel[j];
+ if (!nullPos[i]) {
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ }
+
+ //Change the selected vector
+ batch.size = newSize;
+ } else {
+ int newSize = 0;
+ for(int i = 0; i != n; i++) {
+ if (!nullPos[i]) {
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ }
+ if (newSize < n) {
+ batch.size = newSize;
+ batch.selectedInUse = true;
+ }
+ }
+ }
+ }
+
+ @Override
+ public int getOutputColumn() {
+ return -1;
+ }
+
+ @Override
+ public String getOutputType() {
+ return "boolean";
+ }
+
+ public int getColNum() {
+ return colNum;
+ }
+
+ public void setColNum(int colNum) {
+ this.colNum = colNum;
+ }
+
+ @Override
+ public Descriptor getDescriptor() {
+ VectorExpressionDescriptor.Builder b = new VectorExpressionDescriptor.Builder();
+ b.setMode(VectorExpressionDescriptor.Mode.FILTER)
+ .setNumArguments(2)
+ .setArgumentTypes(
+ VectorExpressionDescriptor.ArgumentType.ALL_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.BINARY)
+ .setInputExpressionTypes(
+ VectorExpressionDescriptor.InputExpressionType.COLUMN,
+ VectorExpressionDescriptor.InputExpressionType.DYNAMICVALUE);
+ return b.build();
+ }
+
+ // Type-specific handling
+ abstract class BloomFilterCheck {
+ abstract public boolean checkValue(ColumnVector columnVector, int idx);
+ }
+
+ class BytesBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ BytesColumnVector col = (BytesColumnVector) columnVector;
+ return bloomFilter.testBytes(col.vector[idx], col.start[idx], col.length[idx]);
+ }
+ }
+
+ class LongBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ LongColumnVector col = (LongColumnVector) columnVector;
+ return bloomFilter.testLong(col.vector[idx]);
+ }
+ }
+
+ class DoubleBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ DoubleColumnVector col = (DoubleColumnVector) columnVector;
+ return bloomFilter.testDouble(col.vector[idx]);
+ }
+ }
+
+ class DecimalBloomFilterCheck extends BloomFilterCheck {
+ private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ DecimalColumnVector col = (DecimalColumnVector) columnVector;
+ int startIdx = col.vector[idx].toBytes(scratchBuffer);
+ return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+ }
+ }
+
+ class TimestampBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ TimestampColumnVector col = (TimestampColumnVector) columnVector;
+ return bloomFilter.testLong(col.time[idx]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
new file mode 100644
index 0000000..3ecb82e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorUDAFBloomFilter extends VectorAggregateExpression {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorUDAFBloomFilter.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorExpression inputExpression;
+ private long expectedEntries = -1;
+ private ValueProcessor valueProcessor;
+ transient private int bitSetSize = -1;
+ transient private BytesWritable bw = new BytesWritable();
+ transient private ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+ /**
+ * class for storing the current aggregate value.
+ */
+ private static final class Aggregation implements AggregationBuffer {
+ private static final long serialVersionUID = 1L;
+
+ BloomFilter bf;
+
+ public Aggregation(long expectedEntries) {
+ bf = new BloomFilter(expectedEntries);
+ }
+
+ @Override
+ public int getVariableSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ bf.reset();
+ }
+ }
+
+ public VectorUDAFBloomFilter(VectorExpression inputExpression) {
+ this();
+ this.inputExpression = inputExpression;
+
+ // Instantiate the ValueProcessor based on the input type
+ VectorExpressionDescriptor.ArgumentType inputType =
+ VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(inputExpression.getOutputType());
+ switch (inputType) {
+ case INT_FAMILY:
+ case DATE:
+ valueProcessor = new ValueProcessorLong();
+ break;
+ case FLOAT_FAMILY:
+ valueProcessor = new ValueProcessorDouble();
+ break;
+ case DECIMAL:
+ valueProcessor = new ValueProcessorDecimal();
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case STRING_FAMILY:
+ case BINARY:
+ valueProcessor = new ValueProcessorBytes();
+ break;
+ case TIMESTAMP:
+ valueProcessor = new ValueProcessorTimestamp();
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + inputType);
+ }
+ }
+
+ public VectorUDAFBloomFilter() {
+ super();
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ if (expectedEntries < 0) {
+ throw new IllegalStateException("expectedEntries not initialized");
+ }
+ return new Aggregation(expectedEntries);
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation) agg;
+
+ if (inputColumn.isRepeating) {
+ if (inputColumn.noNulls) {
+ valueProcessor.processValue(myagg, inputColumn, 0);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColumn.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+ }
+ else if (inputColumn.noNulls){
+ iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i< batchSize; ++i) {
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!inputColumn.isNull[i]) {
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColumn.noNulls) {
+ if (inputColumn.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ } else {
+ if (inputColumn.isRepeating) {
+ // All nulls, no-op for min/max
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize, batch.selected);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, 0);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, row);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selection) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ if (!inputColumn.isNull[row]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ agg.reset();
+ }
+
+ @Override
+ public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+ try {
+ Aggregation bfAgg = (Aggregation) agg;
+ byteStream.reset();
+ BloomFilter.serialize(byteStream, bfAgg.bf);
+ byte[] bytes = byteStream.toByteArray();
+ bw.set(bytes, 0, bytes.length);
+ return bw;
+ } catch (IOException err) {
+ throw new HiveException("Error encountered while serializing bloomfilter", err);
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+
+ @Override
+ public int getAggregationBufferFixedSize() {
+ if (bitSetSize < 0) {
+ // Not pretty, but we need a way to get the size
+ try {
+ Aggregation agg = (Aggregation) getNewAggregationBuffer();
+ bitSetSize = agg.bf.getBitSet().length;
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+ }
+ }
+
+ // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int)
+ JavaDataModel model = JavaDataModel.get();
+ int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
+ model.memoryAlign());
+ return JavaDataModel.alignUp(
+ model.object() + bloomFilterSize + model.primitive1() + model.primitive1(),
+ model.memoryAlign());
+ }
+
+ @Override
+ public void init(AggregationDesc desc) throws HiveException {
+ GenericUDAFBloomFilterEvaluator udafBloomFilter =
+ (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+ expectedEntries = udafBloomFilter.getExpectedEntries();
+ }
+
+ public VectorExpression getInputExpression() {
+ return inputExpression;
+ }
+
+ public void setInputExpression(VectorExpression inputExpression) {
+ this.inputExpression = inputExpression;
+ }
+
+ public long getExpectedEntries() {
+ return expectedEntries;
+ }
+
+ public void setExpectedEntries(long expectedEntries) {
+ this.expectedEntries = expectedEntries;
+ }
+
+ // Type-specific handling done here
+ private static abstract class ValueProcessor {
+ abstract protected void processValue(Aggregation myagg, ColumnVector inputColumn, int index);
+ }
+
+ //
+ // Type-specific implementations
+ //
+
+ public static class ValueProcessorBytes extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+ myagg.bf.addBytes(inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+ }
+ }
+
+ public static class ValueProcessorLong extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ LongColumnVector inputColumn = (LongColumnVector) columnVector;
+ myagg.bf.addLong(inputColumn.vector[i]);
+ }
+ }
+
+ public static class ValueProcessorDouble extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ DoubleColumnVector inputColumn = (DoubleColumnVector) columnVector;
+ myagg.bf.addDouble(inputColumn.vector[i]);
+ }
+ }
+
+ public static class ValueProcessorDecimal extends ValueProcessor {
+ private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ DecimalColumnVector inputColumn = (DecimalColumnVector) columnVector;
+ int startIdx = inputColumn.vector[i].toBytes(scratchBuffer);
+ myagg.bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+ }
+ }
+
+ public static class ValueProcessorTimestamp extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ TimestampColumnVector inputColumn = (TimestampColumnVector) columnVector;
+ myagg.bf.addLong(inputColumn.time[i]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
new file mode 100644
index 0000000..ad190b7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.BloomFilter;
+
+public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorExpression inputExpression;
+ private long expectedEntries = -1;
+ transient private int aggBufferSize = -1;
+ transient private BytesWritable bw = new BytesWritable();
+
+ /**
+ * class for storing the current aggregate value.
+ */
+ private static final class Aggregation implements AggregationBuffer {
+ private static final long serialVersionUID = 1L;
+
+ byte[] bfBytes;
+
+ public Aggregation(long expectedEntries) {
+ try {
+ BloomFilter bf = new BloomFilter(expectedEntries);
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ BloomFilter.serialize(bytesOut, bf);
+ bfBytes = bytesOut.toByteArray();
+ } catch (Exception err) {
+ throw new IllegalArgumentException("Error creating aggregation buffer", err);
+ }
+ }
+
+ @Override
+ public int getVariableSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ // Do not change the initial bytes which contain NumHashFunctions/NumBits!
+ Arrays.fill(bfBytes, BloomFilter.START_OF_SERIALIZED_LONGS, bfBytes.length, (byte) 0);
+ }
+ }
+
+ public VectorUDAFBloomFilterMerge(VectorExpression inputExpression) {
+ this();
+ this.inputExpression = inputExpression;
+ }
+
+ public VectorUDAFBloomFilterMerge() {
+ super();
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ if (expectedEntries < 0) {
+ throw new IllegalStateException("expectedEntries not initialized");
+ }
+ return new Aggregation(expectedEntries);
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation) agg;
+
+ if (inputColumn.isRepeating) {
+ if (inputColumn.noNulls) {
+ processValue(myagg, inputColumn, 0);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColumn.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+ }
+ else if (inputColumn.noNulls){
+ iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i< batchSize; ++i) {
+ processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!inputColumn.isNull[i]) {
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColumn.noNulls) {
+ if (inputColumn.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ } else {
+ if (inputColumn.isRepeating) {
+ // All nulls, no-op for min/max
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize, batch.selected);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, 0);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, row);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selection) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ if (!inputColumn.isNull[row]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ agg.reset();
+ }
+
+ @Override
+ public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+ Aggregation bfAgg = (Aggregation) agg;
+ bw.set(bfAgg.bfBytes, 0, bfAgg.bfBytes.length);
+ return bw;
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+
+ @Override
+ public int getAggregationBufferFixedSize() {
+ if (aggBufferSize < 0) {
+ // Not pretty, but we need a way to get the size
+ try {
+ Aggregation agg = (Aggregation) getNewAggregationBuffer();
+ aggBufferSize = agg.bfBytes.length;
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+ }
+ }
+
+ return aggBufferSize;
+ }
+
+ @Override
+ public void init(AggregationDesc desc) throws HiveException {
+ GenericUDAFBloomFilterEvaluator udafBloomFilter =
+ (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+ expectedEntries = udafBloomFilter.getExpectedEntries();
+ }
+
+ void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ // columnVector entry is byte array representing serialized BloomFilter.
+ // BloomFilter.mergeBloomFilterBytes() does a simple byte ORing
+ // which should be faster than deserialize/merge.
+ BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+ BloomFilter.mergeBloomFilterBytes(myagg.bfBytes, 0, myagg.bfBytes.length,
+ inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e3d9d7f..439950b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -335,6 +335,7 @@ public class Vectorizer implements PhysicalPlanResolver {
supportedGenericUDFs.add(GenericUDFNvl.class);
supportedGenericUDFs.add(GenericUDFElt.class);
supportedGenericUDFs.add(GenericUDFInitCap.class);
+ supportedGenericUDFs.add(GenericUDFInBloomFilter.class);
// For type casts
supportedGenericUDFs.add(UDFToLong.class);
@@ -368,6 +369,7 @@ public class Vectorizer implements PhysicalPlanResolver {
supportedAggregationUdfs.add("stddev");
supportedAggregationUdfs.add("stddev_pop");
supportedAggregationUdfs.add("stddev_samp");
+ supportedAggregationUdfs.add("bloom_filter");
}
private class VectorTaskColumnInfo {
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
index fb9a140..deb0f76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -72,6 +73,8 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
// Bloom filter rest
private ByteArrayOutputStream result = new ByteArrayOutputStream();
+ private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
@@ -167,9 +170,10 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
bf.addDouble(vDouble);
break;
case DECIMAL:
- HiveDecimal vDecimal = ((HiveDecimalObjectInspector)inputOI).
- getPrimitiveJavaObject(parameters[0]);
- bf.addString(vDecimal.toString());
+ HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector)inputOI).
+ getPrimitiveWritableObject(parameters[0]);
+ int startIdx = vDecimal.toBytes(scratchBuffer);
+ bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
break;
case DATE:
DateWritable vDate = ((DateObjectInspector)inputOI).
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
index 1b7de6c..3e6e069 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
@@ -22,8 +22,11 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorInBloomFilterColDynamicValue;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -41,6 +44,7 @@ import java.sql.Timestamp;
/**
* GenericUDF to lookup a value in BloomFilter
*/
+@VectorizedExpressions({VectorInBloomFilterColDynamicValue.class})
public class GenericUDFInBloomFilter extends GenericUDF {
private static final Logger LOG = LoggerFactory.getLogger(GenericUDFInBloomFilter.class);
@@ -48,6 +52,7 @@ public class GenericUDFInBloomFilter extends GenericUDF {
private transient ObjectInspector bloomFilterObjectInspector;
private transient BloomFilter bloomFilter;
private transient boolean initializedBloomFilter;
+ private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
@@ -133,9 +138,10 @@ public class GenericUDFInBloomFilter extends GenericUDF {
get(arguments[0].get());
return bloomFilter.testDouble(vDouble);
case DECIMAL:
- HiveDecimal vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
- getPrimitiveJavaObject(arguments[0].get());
- return bloomFilter.testString(vDecimal.toString());
+ HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
+ getPrimitiveWritableObject(arguments[0].get());
+ int startIdx = vDecimal.toBytes(scratchBuffer);
+ return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
case DATE:
DateWritable vDate = ((DateObjectInspector) valObjectInspector).
getPrimitiveWritableObject(arguments[0].get());