You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2015/09/23 08:25:01 UTC

[1/3] hive git commit: HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master 2e8324e43 -> 7cfe3743f


http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/test/results/clientpositive/vector_struct_in.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/vector_struct_in.q.out b/ql/src/test/results/clientpositive/vector_struct_in.q.out
new file mode 100644
index 0000000..c78b428
--- /dev/null
+++ b/ql/src/test/results/clientpositive/vector_struct_in.q.out
@@ -0,0 +1,825 @@
+PREHOOK: query: -- SORT_QUERY_RESULTS
+
+-- 2 Strings
+create table test_1 (`id` string, `lineid` string) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_1
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+
+-- 2 Strings
+create table test_1 (`id` string, `lineid` string) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_1
+PREHOOK: query: insert into table test_1 values ('one','1'), ('seven','1')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@test_1
+POSTHOOK: query: insert into table test_1 values ('one','1'), ('seven','1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@test_1
+POSTHOOK: Lineage: test_1.id SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: test_1.lineid SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: explain
+select * from test_1 where struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from test_1 where struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_1
+            Statistics: Num rows: 2 Data size: 346 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (struct(id,lineid)) IN (const struct('two','3'), const struct('three','1'), const struct('one','1'), const struct('five','2'), const struct('six','1'), const struct('eight','1'), const struct('seven','1'), const struct('nine','1'), const struct('ten','1')) (type: boolean)
+              Statistics: Num rows: 1 Data size: 173 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: id (type: string), lineid (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 173 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 173 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select * from test_1 where struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from test_1 where struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_1
+#### A masked pattern was here ####
+one	1
+seven	1
+PREHOOK: query: explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+) as b from test_1
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+) as b from test_1
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_1
+            Statistics: Num rows: 2 Data size: 346 Basic stats: COMPLETE Column stats: NONE
+            Select Operator
+              expressions: id (type: string), lineid (type: string), (struct(id,lineid)) IN (const struct('two','3'), const struct('three','1'), const struct('one','1'), const struct('five','2'), const struct('six','1'), const struct('eight','1'), const struct('seven','1'), const struct('nine','1'), const struct('ten','1')) (type: boolean)
+              outputColumnNames: _col0, _col1, _col2
+              Statistics: Num rows: 2 Data size: 346 Basic stats: COMPLETE Column stats: NONE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 2 Data size: 346 Basic stats: COMPLETE Column stats: NONE
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+) as b from test_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_1
+#### A masked pattern was here ####
+POSTHOOK: query: select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+) as b from test_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_1
+#### A masked pattern was here ####
+one	1	true
+seven	1	true
+PREHOOK: query: -- 2 Integers
+create table test_2 (`id` int, `lineid` int) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_2
+POSTHOOK: query: -- 2 Integers
+create table test_2 (`id` int, `lineid` int) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_2
+PREHOOK: query: insert into table test_2 values (1,1), (7,1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@test_2
+POSTHOOK: query: insert into table test_2 values (1,1), (7,1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@test_2
+POSTHOOK: Lineage: test_2.id EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: test_2.lineid EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: explain
+select * from test_2 where struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from test_2 where struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_2
+            Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (struct(id,lineid)) IN (const struct(2,3), const struct(3,1), const struct(1,1), const struct(5,2), const struct(6,1), const struct(8,1), const struct(7,1), const struct(9,1), const struct(10,1)) (type: boolean)
+              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: id (type: int), lineid (type: int)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select * from test_2 where struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from test_2 where struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_2
+#### A masked pattern was here ####
+1	1
+7	1
+PREHOOK: query: explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+) as b from test_2
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+) as b from test_2
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_2
+            Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+            Select Operator
+              expressions: id (type: int), lineid (type: int), (struct(id,lineid)) IN (const struct(2,3), const struct(3,1), const struct(1,1), const struct(5,2), const struct(6,1), const struct(8,1), const struct(7,1), const struct(9,1), const struct(10,1)) (type: boolean)
+              outputColumnNames: _col0, _col1, _col2
+              Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+) as b from test_2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_2
+#### A masked pattern was here ####
+POSTHOOK: query: select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+) as b from test_2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_2
+#### A masked pattern was here ####
+1	1	true
+7	1	true
+PREHOOK: query: -- 1 String and 1 Integer
+create table test_3 (`id` string, `lineid` int) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_3
+POSTHOOK: query: -- 1 String and 1 Integer
+create table test_3 (`id` string, `lineid` int) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_3
+PREHOOK: query: insert into table test_3 values ('one',1), ('seven',1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__3
+PREHOOK: Output: default@test_3
+POSTHOOK: query: insert into table test_3 values ('one',1), ('seven',1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__3
+POSTHOOK: Output: default@test_3
+POSTHOOK: Lineage: test_3.id SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: test_3.lineid EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: explain
+select * from test_3 where struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from test_3 where struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_3
+            Statistics: Num rows: 2 Data size: 184 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (struct(id,lineid)) IN (const struct('two',3), const struct('three',1), const struct('one',1), const struct('five',2), const struct('six',1), const struct('eight',1), const struct('seven',1), const struct('nine',1), const struct('ten',1)) (type: boolean)
+              Statistics: Num rows: 1 Data size: 92 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: id (type: string), lineid (type: int)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 92 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 92 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select * from test_3 where struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_3
+#### A masked pattern was here ####
+POSTHOOK: query: select * from test_3 where struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_3
+#### A masked pattern was here ####
+one	1
+seven	1
+PREHOOK: query: explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+) as b from test_3
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+) as b from test_3
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_3
+            Statistics: Num rows: 2 Data size: 184 Basic stats: COMPLETE Column stats: NONE
+            Select Operator
+              expressions: id (type: string), lineid (type: int), (struct(id,lineid)) IN (const struct('two',3), const struct('three',1), const struct('one',1), const struct('five',2), const struct('six',1), const struct('eight',1), const struct('seven',1), const struct('nine',1), const struct('ten',1)) (type: boolean)
+              outputColumnNames: _col0, _col1, _col2
+              Statistics: Num rows: 2 Data size: 184 Basic stats: COMPLETE Column stats: NONE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 2 Data size: 184 Basic stats: COMPLETE Column stats: NONE
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+) as b from test_3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_3
+#### A masked pattern was here ####
+POSTHOOK: query: select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+) as b from test_3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_3
+#### A masked pattern was here ####
+one	1	true
+seven	1	true
+PREHOOK: query: -- 1 Integer and 1 String and 1 Double
+create table test_4 (`my_bigint` bigint, `my_string` string, `my_double` double) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_4
+POSTHOOK: query: -- 1 Integer and 1 String and 1 Double
+create table test_4 (`my_bigint` bigint, `my_string` string, `my_double` double) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_4
+PREHOOK: query: insert into table test_4 values (1, "b", 1.5), (1, "a", 0.5), (2, "b", 1.5)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__4
+PREHOOK: Output: default@test_4
+POSTHOOK: query: insert into table test_4 values (1, "b", 1.5), (1, "a", 0.5), (2, "b", 1.5)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__4
+POSTHOOK: Output: default@test_4
+POSTHOOK: Lineage: test_4.my_bigint EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: test_4.my_double EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col3, type:string, comment:), ]
+POSTHOOK: Lineage: test_4.my_string SIMPLE [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: explain
+select * from test_4 where struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from test_4 where struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_4
+            Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (struct(my_bigint,my_string,my_double)) IN (const struct(1,'a',1.5), const struct(1,'b',-0.5), const struct(3,'b',1.5), const struct(1,'d',1.5), const struct(1,'c',1.5), const struct(1,'b',2.5), const struct(1,'b',0.5), const struct(5,'b',1.5), const struct(1,'a',0.5), const struct(3,'b',1.5)) (type: boolean)
+              Statistics: Num rows: 1 Data size: 101 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: my_bigint (type: bigint), my_string (type: string), my_double (type: double)
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 101 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 101 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select * from test_4 where struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_4
+#### A masked pattern was here ####
+POSTHOOK: query: select * from test_4 where struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_4
+#### A masked pattern was here ####
+1	a	0.5
+PREHOOK: query: explain
+select `my_bigint`, `my_string`, `my_double`, struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+) as b from test_4
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select `my_bigint`, `my_string`, `my_double`, struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+) as b from test_4
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: test_4
+            Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
+            Select Operator
+              expressions: my_bigint (type: bigint), my_string (type: string), my_double (type: double), (struct(my_bigint,my_string,my_double)) IN (const struct(1,'a',1.5), const struct(1,'b',-0.5), const struct(3,'b',1.5), const struct(1,'d',1.5), const struct(1,'c',1.5), const struct(1,'b',2.5), const struct(1,'b',0.5), const struct(5,'b',1.5), const struct(1,'a',0.5), const struct(3,'b',1.5)) (type: boolean)
+              outputColumnNames: _col0, _col1, _col2, _col3
+              Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
+              File Output Operator
+                compressed: false
+                Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select `my_bigint`, `my_string`, `my_double`, struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+) as b from test_4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_4
+#### A masked pattern was here ####
+POSTHOOK: query: select `my_bigint`, `my_string`, `my_double`, struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+) as b from test_4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_4
+#### A masked pattern was here ####
+1	a	0.5	true
+1	b	1.5	false
+2	b	1.5	false


[3/3] hive git commit: HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)

Posted by go...@apache.org.
HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7cfe3743
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7cfe3743
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7cfe3743

Branch: refs/heads/master
Commit: 7cfe3743ff583386653bdd32c79f2c44ffe734ba
Parents: 2e8324e
Author: Gopal V <go...@apache.org>
Authored: Tue Sep 22 19:39:49 2015 -0700
Committer: Gopal V <go...@apache.org>
Committed: Tue Sep 22 23:24:14 2015 -0700

----------------------------------------------------------------------
 .../ql/exec/vector/VectorizationContext.java    |  203 +-
 .../expressions/FilterStringColumnInList.java   |   13 +-
 .../expressions/FilterStructColumnInList.java   |  178 ++
 .../exec/vector/expressions/IStructInExpr.java  |   36 +
 .../vector/expressions/StringColumnInList.java  |    4 +
 .../vector/expressions/StructColumnInList.java  |  174 ++
 .../hive/ql/optimizer/physical/Vectorizer.java  |   71 +-
 .../ql/optimizer/physical/Vectorizer.java.orig  | 1744 ++++++++++++++++++
 .../ql/optimizer/physical/Vectorizer.java.rej   |   86 +
 .../queries/clientpositive/vector_struct_in.q   |  247 +++
 .../clientpositive/vector_struct_in.q.out       |  825 +++++++++
 11 files changed, 3566 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 2483196..46c2a78 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -104,20 +104,30 @@ import org.apache.hadoop.hive.ql.udf.UDFToLong;
 import org.apache.hadoop.hive.ql.udf.UDFToShort;
 import org.apache.hadoop.hive.ql.udf.UDFToString;
 import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.DateUtils;
 
+
 /**
  * Context class for vectorization execution.
  * Main role is to map column names to column indices and serves as a
@@ -1273,17 +1283,208 @@ public class VectorizationContext {
     }
   }
 
+  public enum InConstantType {
+    INT_FAMILY,
+    TIMESTAMP,
+    DATE,
+    FLOAT_FAMILY,
+    STRING_FAMILY,
+    DECIMAL
+  }
+
+  public static InConstantType getInConstantTypeFromPrimitiveCategory(PrimitiveCategory primitiveCategory) {
+
+    switch (primitiveCategory) {
+    case BOOLEAN:
+    case BYTE:
+    case SHORT:
+    case INT:
+    case LONG:
+      return InConstantType.INT_FAMILY;
+  
+    case DATE:
+      return InConstantType.TIMESTAMP;
+
+    case TIMESTAMP:
+      return InConstantType.DATE;
+
+    case FLOAT:
+    case DOUBLE:
+      return InConstantType.FLOAT_FAMILY;
+  
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+    case BINARY:
+      return InConstantType.STRING_FAMILY;
+  
+    case DECIMAL:
+      return InConstantType.DECIMAL;
+  
+
+    case INTERVAL_YEAR_MONTH:
+    case INTERVAL_DAY_TIME:
+      // UNDONE: Fall through for these... they don't appear to be supported yet.
+    default:
+      throw new RuntimeException("Unexpected primitive type category " + primitiveCategory);
+    }
+  }
+
+  private VectorExpression getStructInExpression(List<ExprNodeDesc> childExpr, ExprNodeDesc colExpr,
+      TypeInfo colTypeInfo, List<ExprNodeDesc> inChildren, Mode mode, TypeInfo returnType)
+          throws HiveException {
+
+    VectorExpression expr = null;
+
+    StructTypeInfo structTypeInfo = (StructTypeInfo) colTypeInfo;
+
+    ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+    final int fieldCount = fieldTypeInfos.size();
+    ColumnVector.Type[] fieldVectorColumnTypes = new ColumnVector.Type[fieldCount];
+    InConstantType[] fieldInConstantTypes = new InConstantType[fieldCount];
+    for (int f = 0; f < fieldCount; f++) {
+      TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
+      // Only primitive fields supports for now.
+      if (fieldTypeInfo.getCategory() != Category.PRIMITIVE) {
+        return null;
+      }
+
+      // We are going to serialize using the 4 basic types.
+      ColumnVector.Type fieldVectorColumnType = getColumnVectorTypeFromTypeInfo(fieldTypeInfo);
+      fieldVectorColumnTypes[f] = fieldVectorColumnType;
+
+      // We currently evaluate the IN (..) constants in special ways.
+      PrimitiveCategory fieldPrimitiveCategory =
+          ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory();
+      InConstantType inConstantType = getInConstantTypeFromPrimitiveCategory(fieldPrimitiveCategory);
+      fieldInConstantTypes[f] = inConstantType;
+    }
+
+    Output buffer = new Output();
+    BinarySortableSerializeWrite binarySortableSerializeWrite =
+        new BinarySortableSerializeWrite(fieldCount);
+
+    final int inChildrenCount = inChildren.size();
+    byte[][] serializedInChildren = new byte[inChildrenCount][];
+    try {
+      for (int i = 0; i < inChildrenCount; i++) {
+        final ExprNodeDesc node = inChildren.get(i);
+        final Object[] constants;
+
+        if (node instanceof ExprNodeConstantDesc) {
+          ExprNodeConstantDesc constNode = (ExprNodeConstantDesc) node;
+          ConstantObjectInspector output = constNode.getWritableObjectInspector();
+          constants = ((List<?>) output.getWritableConstantValue()).toArray();
+        } else {
+          ExprNodeGenericFuncDesc exprNode = (ExprNodeGenericFuncDesc) node;
+          ExprNodeEvaluator<?> evaluator = ExprNodeEvaluatorFactory
+              .get(exprNode);
+          ObjectInspector output = evaluator.initialize(exprNode
+              .getWritableObjectInspector());
+          constants = (Object[]) evaluator.evaluate(null);
+        }
+
+        binarySortableSerializeWrite.set(buffer);
+        for (int f = 0; f < fieldCount; f++) {
+          Object constant = constants[f];
+          if (constant == null) {
+            binarySortableSerializeWrite.writeNull();
+          } else {
+            InConstantType inConstantType = fieldInConstantTypes[f];
+            switch (inConstantType) {
+            case STRING_FAMILY:
+              {
+                byte[] bytes;
+                if (constant instanceof Text) {
+                  Text text = (Text) constant;
+                  bytes = text.getBytes();
+                  binarySortableSerializeWrite.writeString(bytes, 0, text.getLength());
+                } else {
+                  throw new HiveException("Unexpected constant String type " +
+                      constant.getClass().getSimpleName());
+                }
+              }
+              break;
+            case INT_FAMILY:
+              {
+                long value;
+                if (constant instanceof IntWritable) {
+                  value = ((IntWritable) constant).get();
+                } else if (constant instanceof LongWritable) {
+                  value = ((LongWritable) constant).get();
+                } else {
+                  throw new HiveException("Unexpected constant Long type " +
+                      constant.getClass().getSimpleName());
+                }
+                binarySortableSerializeWrite.writeLong(value);
+              }
+              break;
+
+            case FLOAT_FAMILY:
+              {
+                double value;
+                if (constant instanceof DoubleWritable) {
+                  value = ((DoubleWritable) constant).get();
+                } else {
+                  throw new HiveException("Unexpected constant Double type " +
+                      constant.getClass().getSimpleName());
+                }
+                binarySortableSerializeWrite.writeDouble(value);
+              }
+              break;
+
+            // UNDONE...
+            case DATE:
+            case TIMESTAMP:
+            case DECIMAL:
+            default:
+              throw new RuntimeException("Unexpected IN constant type " + inConstantType.name());
+            }
+          }
+        }
+        serializedInChildren[i] = Arrays.copyOfRange(buffer.getData(), 0, buffer.getLength());
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+
+    // Create a single child representing the scratch column where we will
+    // generate the serialized keys of the batch.
+    int scratchBytesCol = ocm.allocateOutputColumn("string");
+
+    Class<?> cl = (mode == Mode.FILTER ? FilterStructColumnInList.class : StructColumnInList.class);
+
+    expr = createVectorExpression(cl, null, Mode.PROJECTION, returnType);
+
+    ((IStringInExpr) expr).setInListValues(serializedInChildren);
+
+    ((IStructInExpr) expr).setScratchBytesColumn(scratchBytesCol);
+    ((IStructInExpr) expr).setStructColumnExprs(this, colExpr.getChildren(),
+        fieldVectorColumnTypes);
+
+    return expr;
+  }
+
   /**
    * Create a filter or boolean-valued expression for column IN ( <list-of-constants> )
    */
   private VectorExpression getInExpression(List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType)
       throws HiveException {
     ExprNodeDesc colExpr = childExpr.get(0);
+    List<ExprNodeDesc> inChildren = childExpr.subList(1, childExpr.size());
 
     String colType = colExpr.getTypeString();
+    colType = VectorizationContext.mapTypeNameSynonyms(colType);
+    TypeInfo colTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(colType);
+    Category category = colTypeInfo.getCategory();
+    if (category == Category.STRUCT){
+      return getStructInExpression(childExpr, colExpr, colTypeInfo, inChildren, mode, returnType);
+    } else if (category != Category.PRIMITIVE) {
+      return null;
+    }
 
     // prepare arguments for createVectorExpression
-    List<ExprNodeDesc> childrenForInList =  evaluateCastOnConstants(childExpr.subList(1, childExpr.size()));
+    List<ExprNodeDesc> childrenForInList =  evaluateCastOnConstants(inChildren);
 
     /* This method assumes that the IN list has no NULL entries. That is enforced elsewhere,
      * in the Vectorizer class. If NULL is passed in as a list entry, behavior is not defined.

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
index 2434e90..e34ec75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java
@@ -20,16 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.expressions;
 
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.udf.UDFLike;
-import org.apache.hadoop.io.Text;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Evaluate an IN filter on a batch for a vector of strings.
@@ -165,6 +156,10 @@ public class FilterStringColumnInList extends VectorExpression implements IStrin
     return "boolean";
   }
 
+  public void setInputColumn(int inputCol) {
+    this.inputCol = inputCol;
+  }
+
   @Override
   public int getOutputColumn() {
     return -1;

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
new file mode 100644
index 0000000..00f22bb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/**
+ * Evaluate an IN filter on a batch for a vector of structs.
+ * This is optimized so that no objects have to be created in
+ * the inner loop, and there is a hash table implemented
+ * with Cuckoo hashing that has fast lookup to do the IN test.
+ */
+public class FilterStructColumnInList extends FilterStringColumnInList implements IStructInExpr {
+  private static final long serialVersionUID = 1L;
+  private VectorExpression[] structExpressions;
+  private ColumnVector.Type[] fieldVectorColumnTypes;
+  private int[] structColumnMap;
+  private int scratchBytesColumn;
+
+  private transient Output buffer;
+  private transient BinarySortableSerializeWrite binarySortableSerializeWrite;
+
+  /**
+   * After construction you must call setInListValues() to add the values to the IN set
+   * (on the IStringInExpr interface).
+   *
+   * And, call a and b on the IStructInExpr interface.
+   */
+  public FilterStructColumnInList() {
+    super(-1);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+
+    final int logicalSize = batch.size;
+    if (logicalSize == 0) {
+      return;
+    }
+
+    if (buffer == null) {
+      buffer = new Output();
+      binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length);
+    }
+
+    for (VectorExpression ve : structExpressions) {
+      ve.evaluate(batch);
+    }
+
+    BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn];
+
+    try {
+    boolean selectedInUse = batch.selectedInUse;
+    int[] selected = batch.selected;
+    for (int logical = 0; logical < logicalSize; logical++) {
+      int batchIndex = (selectedInUse ? selected[logical] : logical);
+
+      binarySortableSerializeWrite.set(buffer);
+      for (int f = 0; f < structColumnMap.length; f++) {
+        int fieldColumn = structColumnMap[f];
+        ColumnVector colVec = batch.cols[fieldColumn];
+        int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex);
+        if (colVec.noNulls || !colVec.isNull[adjustedIndex]) {
+          switch (fieldVectorColumnTypes[f]) {
+          case BYTES:
+            {
+              BytesColumnVector bytesColVec = (BytesColumnVector) colVec;
+              byte[] bytes = bytesColVec.vector[adjustedIndex];
+              int start = bytesColVec.start[adjustedIndex];
+              int length = bytesColVec.length[adjustedIndex];
+              binarySortableSerializeWrite.writeString(bytes, start, length);
+            }
+            break;
+
+          case LONG:
+            binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DOUBLE:
+            binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DECIMAL:
+            binarySortableSerializeWrite.writeHiveDecimal(
+                ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal());
+            break;
+
+          default:
+            throw new RuntimeException("Unexpected vector column type " +
+                fieldVectorColumnTypes[f].name());
+          } 
+        } else {
+          binarySortableSerializeWrite.writeNull();
+        }
+      }
+      scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength());
+    }
+
+    // Now, take the serialized keys we just wrote into our scratch column and look them
+    // up in the IN list.
+    super.evaluate(batch);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+ 
+  }
+
+
+  @Override
+  public String getOutputType() {
+    return "boolean";
+  }
+
+  @Override
+  public int getOutputColumn() {
+    return -1;
+  }
+
+  @Override
+  public Descriptor getDescriptor() {
+
+    // This VectorExpression (IN) is a special case, so don't return a descriptor.
+    return null;
+  }
+
+  @Override
+  public void setScratchBytesColumn(int scratchBytesColumn) {
+
+    // Tell our super class FilterStringColumnInList it will be evaluating our scratch
+    // BytesColumnVector.
+    super.setInputColumn(scratchBytesColumn);
+    this.scratchBytesColumn = scratchBytesColumn;
+  }
+
+  @Override
+  public void setStructColumnExprs(VectorizationContext vContext,
+      List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes)
+          throws HiveException {
+
+    structExpressions = vContext.getVectorExpressions(structColumnExprs);
+    structColumnMap = new int[structExpressions.length];
+    for (int i = 0; i < structColumnMap.length; i++) {
+      VectorExpression ve = structExpressions[i];
+      structColumnMap[i] = ve.getOutputColumn();
+    }
+    this.fieldVectorColumnTypes = fieldVectorColumnTypes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
new file mode 100644
index 0000000..3b25255
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+/**
+ * Interface used for both filter and non-filter versions of IN to simplify
+ * VectorizationContext code.
+ */
+public interface IStructInExpr {
+  void setScratchBytesColumn(int scratchBytesColumn);
+  void setStructColumnExprs(VectorizationContext vContext, List<ExprNodeDesc> structColumnExprs,
+      ColumnVector.Type[] fieldVectorColumnTypes) throws HiveException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
index 03833a2..b90e3c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java
@@ -140,6 +140,10 @@ public class StringColumnInList extends VectorExpression implements IStringInExp
     return "boolean";
   }
 
+  public void setInputColumn(int inputCol) {
+    this.inputCol = inputCol;
+  }
+
   @Override
   public int getOutputColumn() {
     return this.outputColumn;

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
new file mode 100644
index 0000000..724497a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/**
+ * Evaluate an IN boolean expression (not a filter) on a batch for a vector of structs.
+ * This is optimized so that no objects have to be created in
+ * the inner loop, and there is a hash table implemented
+ * with Cuckoo hashing that has fast lookup to do the IN test.
+ */
+public class StructColumnInList extends StringColumnInList implements IStructInExpr {
+  private static final long serialVersionUID = 1L;
+  private VectorExpression[] structExpressions;
+  private ColumnVector.Type[] fieldVectorColumnTypes;
+  private int[] structColumnMap;
+  private int scratchBytesColumn;
+
+  private transient Output buffer;
+  private transient BinarySortableSerializeWrite binarySortableSerializeWrite;
+
+  public StructColumnInList() {
+    super();
+  }
+
+  /**
+   * After construction you must call setInListValues() to add the values to the IN set.
+   */
+  public StructColumnInList(int outputColumn) {
+    super(-1, outputColumn);
+  }
+
+  @Override
+  public void evaluate(VectorizedRowBatch batch) {
+
+    final int logicalSize = batch.size;
+    if (logicalSize == 0) {
+      return;
+    }
+
+    if (buffer == null) {
+      buffer = new Output();
+      binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length);
+    }
+
+    for (VectorExpression ve : structExpressions) {
+      ve.evaluate(batch);
+    }
+
+    BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn];
+
+    try {
+    boolean selectedInUse = batch.selectedInUse;
+    int[] selected = batch.selected;
+    for (int logical = 0; logical < logicalSize; logical++) {
+      int batchIndex = (selectedInUse ? selected[logical] : logical);
+
+      binarySortableSerializeWrite.set(buffer);
+      for (int f = 0; f < structColumnMap.length; f++) {
+        int fieldColumn = structColumnMap[f];
+        ColumnVector colVec = batch.cols[fieldColumn];
+        int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex);
+        if (colVec.noNulls || !colVec.isNull[adjustedIndex]) {
+          switch (fieldVectorColumnTypes[f]) {
+          case BYTES:
+            {
+              BytesColumnVector bytesColVec = (BytesColumnVector) colVec;
+              byte[] bytes = bytesColVec.vector[adjustedIndex];
+              int start = bytesColVec.start[adjustedIndex];
+              int length = bytesColVec.length[adjustedIndex];
+              binarySortableSerializeWrite.writeString(bytes, start, length);
+            }
+            break;
+
+          case LONG:
+            binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DOUBLE:
+            binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]);
+            break;
+
+          case DECIMAL:
+            binarySortableSerializeWrite.writeHiveDecimal(
+                ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal());
+            break;
+
+          default:
+            throw new RuntimeException("Unexpected vector column type " +
+                fieldVectorColumnTypes[f].name());
+          } 
+        } else {
+          binarySortableSerializeWrite.writeNull();
+        }
+      }
+      scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength());
+    }
+
+    // Now, take the serialized keys we just wrote into our scratch column and look them
+    // up in the IN list.
+    super.evaluate(batch);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+   }
+
+
+  @Override
+  public String getOutputType() {
+    return "boolean";
+  }
+
+  @Override
+  public Descriptor getDescriptor() {
+
+    // This VectorExpression (IN) is a special case, so don't return a descriptor.
+    return null;
+  }
+
+
+  @Override
+  public void setScratchBytesColumn(int scratchBytesColumn) {
+
+    // Tell our super class FilterStringColumnInList it will be evaluating our scratch
+    // BytesColumnVector.
+    super.setInputColumn(scratchBytesColumn);
+    this.scratchBytesColumn = scratchBytesColumn;
+  }
+
+  @Override
+  public void setStructColumnExprs(VectorizationContext vContext,
+      List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes)
+          throws HiveException {
+
+    structExpressions = vContext.getVectorExpressions(structColumnExprs);
+    structColumnMap = new int[structExpressions.length];
+    for (int i = 0; i < structColumnMap.length; i++) {
+      VectorExpression ve = structExpressions[i];
+      structColumnMap[i] = ve.getOutputColumn();
+    }
+    this.fieldVectorColumnTypes = fieldVectorColumnTypes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 0d4c1d8..da1d9eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -53,10 +53,12 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiString
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
@@ -139,8 +141,11 @@ import org.apache.hadoop.hive.ql.udf.UDFYear;
 import org.apache.hadoop.hive.ql.udf.generic.*;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
@@ -575,7 +580,12 @@ public class Vectorizer implements PhysicalPlanResolver {
         if (nonVectorizableChildOfGroupBy(op)) {
           return new Boolean(true);
         }
-        boolean ret = validateMapWorkOperator(op, mapWork, isTez);
+        boolean ret;
+        try {
+          ret = validateMapWorkOperator(op, mapWork, isTez);
+        } catch (Exception e) {
+          throw new SemanticException(e);
+        }
         if (!ret) {
           LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
           return new Boolean(false);
@@ -1260,6 +1270,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
       return false;
     }
+    boolean isInExpression = false;
     if (desc instanceof ExprNodeGenericFuncDesc) {
       ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
       boolean r = validateGenericUdf(d);
@@ -1267,12 +1278,62 @@ public class Vectorizer implements PhysicalPlanResolver {
         LOG.info("Cannot vectorize UDF " + d);
         return false;
       }
+      GenericUDF genericUDF = d.getGenericUDF();
+      isInExpression = (genericUDF instanceof GenericUDFIn);
     }
     if (desc.getChildren() != null) {
-      for (ExprNodeDesc d: desc.getChildren()) {
-        // Don't restrict child expressions for projection.  Always use looser FILTER mode.
-        boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER);
-        if (!r) {
+      if (isInExpression
+          && desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) {
+        // Don't restrict child expressions for projection. 
+        // Always use loose FILTER mode.
+        if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) {
+          return false;
+        }
+      } else {
+        for (ExprNodeDesc d : desc.getChildren()) {
+          // Don't restrict child expressions for projection. 
+          // Always use loose FILTER mode.
+          if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  private boolean validateStructInExpression(ExprNodeDesc desc,
+      VectorExpressionDescriptor.Mode mode) {
+    for (ExprNodeDesc d : desc.getChildren()) {
+      TypeInfo typeInfo = d.getTypeInfo();
+      if (typeInfo.getCategory() != Category.STRUCT) {
+        return false;
+      }
+      StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+
+      ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo
+          .getAllStructFieldTypeInfos();
+      ArrayList<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+      final int fieldCount = fieldTypeInfos.size();
+      for (int f = 0; f < fieldCount; f++) {
+        TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
+        Category category = fieldTypeInfo.getCategory();
+        if (category != Category.PRIMITIVE) {
+          LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+              + " of type " + fieldTypeInfo.getTypeName());
+          return false;
+        }
+        PrimitiveTypeInfo fieldPrimitiveTypeInfo = (PrimitiveTypeInfo) fieldTypeInfo;
+        InConstantType inConstantType = VectorizationContext
+            .getInConstantTypeFromPrimitiveCategory(fieldPrimitiveTypeInfo
+                .getPrimitiveCategory());
+
+        // For now, limit the data types we support for Vectorized Struct IN().
+        if (inConstantType != InConstantType.INT_FAMILY
+            && inConstantType != InConstantType.FLOAT_FAMILY
+            && inConstantType != InConstantType.STRING_FAMILY) {
+          LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+              + " of type " + fieldTypeInfo.getTypeName());
           return false;
         }
       }


[2/3] hive git commit: HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)

Posted by go...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig
new file mode 100644
index 0000000..0d4c1d8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.orig
@@ -0,0 +1,1744 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Stack;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerLongOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerMultiKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiLongOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiMultiKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.UDFAcos;
+import org.apache.hadoop.hive.ql.udf.UDFAsin;
+import org.apache.hadoop.hive.ql.udf.UDFAtan;
+import org.apache.hadoop.hive.ql.udf.UDFBin;
+import org.apache.hadoop.hive.ql.udf.UDFConv;
+import org.apache.hadoop.hive.ql.udf.UDFCos;
+import org.apache.hadoop.hive.ql.udf.UDFDayOfMonth;
+import org.apache.hadoop.hive.ql.udf.UDFDegrees;
+import org.apache.hadoop.hive.ql.udf.UDFExp;
+import org.apache.hadoop.hive.ql.udf.UDFHex;
+import org.apache.hadoop.hive.ql.udf.UDFHour;
+import org.apache.hadoop.hive.ql.udf.UDFLength;
+import org.apache.hadoop.hive.ql.udf.UDFLike;
+import org.apache.hadoop.hive.ql.udf.UDFLn;
+import org.apache.hadoop.hive.ql.udf.UDFLog;
+import org.apache.hadoop.hive.ql.udf.UDFLog10;
+import org.apache.hadoop.hive.ql.udf.UDFLog2;
+import org.apache.hadoop.hive.ql.udf.UDFMinute;
+import org.apache.hadoop.hive.ql.udf.UDFMonth;
+import org.apache.hadoop.hive.ql.udf.UDFRadians;
+import org.apache.hadoop.hive.ql.udf.UDFRand;
+import org.apache.hadoop.hive.ql.udf.UDFSecond;
+import org.apache.hadoop.hive.ql.udf.UDFSign;
+import org.apache.hadoop.hive.ql.udf.UDFSin;
+import org.apache.hadoop.hive.ql.udf.UDFSqrt;
+import org.apache.hadoop.hive.ql.udf.UDFSubstr;
+import org.apache.hadoop.hive.ql.udf.UDFTan;
+import org.apache.hadoop.hive.ql.udf.UDFToBoolean;
+import org.apache.hadoop.hive.ql.udf.UDFToByte;
+import org.apache.hadoop.hive.ql.udf.UDFToDouble;
+import org.apache.hadoop.hive.ql.udf.UDFToFloat;
+import org.apache.hadoop.hive.ql.udf.UDFToInteger;
+import org.apache.hadoop.hive.ql.udf.UDFToLong;
+import org.apache.hadoop.hive.ql.udf.UDFToShort;
+import org.apache.hadoop.hive.ql.udf.UDFToString;
+import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
+import org.apache.hadoop.hive.ql.udf.UDFYear;
+import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+public class Vectorizer implements PhysicalPlanResolver {
+
+  protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class);
+
+  Pattern supportedDataTypesPattern;
+  List<Task<? extends Serializable>> vectorizableTasks =
+      new ArrayList<Task<? extends Serializable>>();
+  Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
+
+  Set<String> supportedAggregationUdfs = new HashSet<String>();
+
+  private HiveConf hiveConf;
+
+  public Vectorizer() {
+
+    StringBuilder patternBuilder = new StringBuilder();
+    patternBuilder.append("int");
+    patternBuilder.append("|smallint");
+    patternBuilder.append("|tinyint");
+    patternBuilder.append("|bigint");
+    patternBuilder.append("|integer");
+    patternBuilder.append("|long");
+    patternBuilder.append("|short");
+    patternBuilder.append("|timestamp");
+    patternBuilder.append("|" + serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME);
+    patternBuilder.append("|" + serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME);
+    patternBuilder.append("|boolean");
+    patternBuilder.append("|binary");
+    patternBuilder.append("|string");
+    patternBuilder.append("|byte");
+    patternBuilder.append("|float");
+    patternBuilder.append("|double");
+    patternBuilder.append("|date");
+    patternBuilder.append("|void");
+
+    // Decimal types can be specified with different precision and scales e.g. decimal(10,5),
+    // as opposed to other data types which can be represented by constant strings.
+    // The regex matches only the "decimal" prefix of the type.
+    patternBuilder.append("|decimal.*");
+
+    // CHAR and VARCHAR types can be specified with maximum length.
+    patternBuilder.append("|char.*");
+    patternBuilder.append("|varchar.*");
+
+    supportedDataTypesPattern = Pattern.compile(patternBuilder.toString());
+
+    supportedGenericUDFs.add(GenericUDFOPPlus.class);
+    supportedGenericUDFs.add(GenericUDFOPMinus.class);
+    supportedGenericUDFs.add(GenericUDFOPMultiply.class);
+    supportedGenericUDFs.add(GenericUDFOPDivide.class);
+    supportedGenericUDFs.add(GenericUDFOPMod.class);
+    supportedGenericUDFs.add(GenericUDFOPNegative.class);
+    supportedGenericUDFs.add(GenericUDFOPPositive.class);
+
+    supportedGenericUDFs.add(GenericUDFOPEqualOrLessThan.class);
+    supportedGenericUDFs.add(GenericUDFOPEqualOrGreaterThan.class);
+    supportedGenericUDFs.add(GenericUDFOPGreaterThan.class);
+    supportedGenericUDFs.add(GenericUDFOPLessThan.class);
+    supportedGenericUDFs.add(GenericUDFOPNot.class);
+    supportedGenericUDFs.add(GenericUDFOPNotEqual.class);
+    supportedGenericUDFs.add(GenericUDFOPNotNull.class);
+    supportedGenericUDFs.add(GenericUDFOPNull.class);
+    supportedGenericUDFs.add(GenericUDFOPOr.class);
+    supportedGenericUDFs.add(GenericUDFOPAnd.class);
+    supportedGenericUDFs.add(GenericUDFOPEqual.class);
+    supportedGenericUDFs.add(UDFLength.class);
+
+    supportedGenericUDFs.add(UDFYear.class);
+    supportedGenericUDFs.add(UDFMonth.class);
+    supportedGenericUDFs.add(UDFDayOfMonth.class);
+    supportedGenericUDFs.add(UDFHour.class);
+    supportedGenericUDFs.add(UDFMinute.class);
+    supportedGenericUDFs.add(UDFSecond.class);
+    supportedGenericUDFs.add(UDFWeekOfYear.class);
+    supportedGenericUDFs.add(GenericUDFToUnixTimeStamp.class);
+
+    supportedGenericUDFs.add(GenericUDFDateAdd.class);
+    supportedGenericUDFs.add(GenericUDFDateSub.class);
+    supportedGenericUDFs.add(GenericUDFDate.class);
+    supportedGenericUDFs.add(GenericUDFDateDiff.class);
+
+    supportedGenericUDFs.add(UDFLike.class);
+    supportedGenericUDFs.add(GenericUDFRegExp.class);
+    supportedGenericUDFs.add(UDFSubstr.class);
+    supportedGenericUDFs.add(GenericUDFLTrim.class);
+    supportedGenericUDFs.add(GenericUDFRTrim.class);
+    supportedGenericUDFs.add(GenericUDFTrim.class);
+
+    supportedGenericUDFs.add(UDFSin.class);
+    supportedGenericUDFs.add(UDFCos.class);
+    supportedGenericUDFs.add(UDFTan.class);
+    supportedGenericUDFs.add(UDFAsin.class);
+    supportedGenericUDFs.add(UDFAcos.class);
+    supportedGenericUDFs.add(UDFAtan.class);
+    supportedGenericUDFs.add(UDFDegrees.class);
+    supportedGenericUDFs.add(UDFRadians.class);
+    supportedGenericUDFs.add(GenericUDFFloor.class);
+    supportedGenericUDFs.add(GenericUDFCeil.class);
+    supportedGenericUDFs.add(UDFExp.class);
+    supportedGenericUDFs.add(UDFLn.class);
+    supportedGenericUDFs.add(UDFLog2.class);
+    supportedGenericUDFs.add(UDFLog10.class);
+    supportedGenericUDFs.add(UDFLog.class);
+    supportedGenericUDFs.add(GenericUDFPower.class);
+    supportedGenericUDFs.add(GenericUDFRound.class);
+    supportedGenericUDFs.add(GenericUDFBRound.class);
+    supportedGenericUDFs.add(GenericUDFPosMod.class);
+    supportedGenericUDFs.add(UDFSqrt.class);
+    supportedGenericUDFs.add(UDFSign.class);
+    supportedGenericUDFs.add(UDFRand.class);
+    supportedGenericUDFs.add(UDFBin.class);
+    supportedGenericUDFs.add(UDFHex.class);
+    supportedGenericUDFs.add(UDFConv.class);
+
+    supportedGenericUDFs.add(GenericUDFLower.class);
+    supportedGenericUDFs.add(GenericUDFUpper.class);
+    supportedGenericUDFs.add(GenericUDFConcat.class);
+    supportedGenericUDFs.add(GenericUDFAbs.class);
+    supportedGenericUDFs.add(GenericUDFBetween.class);
+    supportedGenericUDFs.add(GenericUDFIn.class);
+    supportedGenericUDFs.add(GenericUDFCase.class);
+    supportedGenericUDFs.add(GenericUDFWhen.class);
+    supportedGenericUDFs.add(GenericUDFCoalesce.class);
+    supportedGenericUDFs.add(GenericUDFElt.class);
+    supportedGenericUDFs.add(GenericUDFInitCap.class);
+
+    // For type casts
+    supportedGenericUDFs.add(UDFToLong.class);
+    supportedGenericUDFs.add(UDFToInteger.class);
+    supportedGenericUDFs.add(UDFToShort.class);
+    supportedGenericUDFs.add(UDFToByte.class);
+    supportedGenericUDFs.add(UDFToBoolean.class);
+    supportedGenericUDFs.add(UDFToFloat.class);
+    supportedGenericUDFs.add(UDFToDouble.class);
+    supportedGenericUDFs.add(UDFToString.class);
+    supportedGenericUDFs.add(GenericUDFTimestamp.class);
+    supportedGenericUDFs.add(GenericUDFToDecimal.class);
+    supportedGenericUDFs.add(GenericUDFToDate.class);
+    supportedGenericUDFs.add(GenericUDFToChar.class);
+    supportedGenericUDFs.add(GenericUDFToVarchar.class);
+    supportedGenericUDFs.add(GenericUDFToIntervalYearMonth.class);
+    supportedGenericUDFs.add(GenericUDFToIntervalDayTime.class);
+
+    // For conditional expressions
+    supportedGenericUDFs.add(GenericUDFIf.class);
+
+    supportedAggregationUdfs.add("min");
+    supportedAggregationUdfs.add("max");
+    supportedAggregationUdfs.add("count");
+    supportedAggregationUdfs.add("sum");
+    supportedAggregationUdfs.add("avg");
+    supportedAggregationUdfs.add("variance");
+    supportedAggregationUdfs.add("var_pop");
+    supportedAggregationUdfs.add("var_samp");
+    supportedAggregationUdfs.add("std");
+    supportedAggregationUdfs.add("stddev");
+    supportedAggregationUdfs.add("stddev_pop");
+    supportedAggregationUdfs.add("stddev_samp");
+  }
+
+  class VectorizationDispatcher implements Dispatcher {
+
+    private List<String> reduceColumnNames;
+    private List<TypeInfo> reduceTypeInfos;
+
+    public VectorizationDispatcher(PhysicalContext physicalContext) {
+      reduceColumnNames = null;
+      reduceTypeInfos = null;
+    }
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+        throws SemanticException {
+      Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+      if (currTask instanceof MapRedTask) {
+        convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false);
+      } else if (currTask instanceof TezTask) {
+        TezWork work = ((TezTask) currTask).getWork();
+        for (BaseWork w: work.getAllWork()) {
+          if (w instanceof MapWork) {
+            convertMapWork((MapWork) w, true);
+          } else if (w instanceof ReduceWork) {
+            // We are only vectorizing Reduce under Tez.
+            if (HiveConf.getBoolVar(hiveConf,
+                        HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
+              convertReduceWork((ReduceWork) w, true);
+            }
+          }
+        }
+      } else if (currTask instanceof SparkTask) {
+        SparkWork sparkWork = (SparkWork) currTask.getWork();
+        for (BaseWork baseWork : sparkWork.getAllWork()) {
+          if (baseWork instanceof MapWork) {
+            convertMapWork((MapWork) baseWork, false);
+          } else if (baseWork instanceof ReduceWork
+              && HiveConf.getBoolVar(hiveConf,
+                  HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
+            convertReduceWork((ReduceWork) baseWork, false);
+          }
+        }
+      }
+      return null;
+    }
+
+    private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+      boolean ret = validateMapWork(mapWork, isTez);
+      if (ret) {
+        vectorizeMapWork(mapWork, isTez);
+      }
+    }
+
+    private void addMapWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
+      opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
+          + FileSinkOperator.getOperatorName()), np);
+      opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
+          + ReduceSinkOperator.getOperatorName()), np);
+    }
+
+    private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+      LOG.info("Validating MapWork...");
+
+      // Eliminate MR plans with more than one TableScanOperator.
+      LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
+      if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
+        return false;
+      }
+      int tableScanCount = 0;
+      for (Operator<?> op : aliasToWork.values()) {
+        if (op == null) {
+          LOG.warn("Map work has invalid aliases to work with. Fail validation!");
+          return false;
+        }
+        if (op instanceof TableScanOperator) {
+          tableScanCount++;
+        }
+      }
+      if (tableScanCount > 1) {
+        LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!");
+        return false;
+      }
+
+      // Validate the input format
+      for (String path : mapWork.getPathToPartitionInfo().keySet()) {
+        PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
+        List<Class<?>> interfaceList =
+            Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
+        if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
+          LOG.info("Input format: " + pd.getInputFileFormatClassName()
+              + ", doesn't provide vectorized input");
+          return false;
+        }
+      }
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
+      addMapWorkRules(opRules, vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+
+      // iterator the mapper operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(mapWork.getAliasToWork().values());
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+      for (Node n : nodeOutput.keySet()) {
+        if (nodeOutput.get(n) != null) {
+          if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+      LOG.info("Vectorizing MapWork...");
+      mapWork.setVectorMode(true);
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez);
+      addMapWorkRules(opRules, vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new PreOrderOnceWalker(disp);
+      // iterator the mapper operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(mapWork.getAliasToWork().values());
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+
+      mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
+      mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
+      mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+
+      if (LOG.isDebugEnabled()) {
+        debugDisplayAllMaps(mapWork);
+      }
+
+      return;
+    }
+
+    private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+      boolean ret = validateReduceWork(reduceWork);
+      if (ret) {
+        vectorizeReduceWork(reduceWork, isTez);
+      }
+    }
+
+    private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
+      try {
+        // Check key ObjectInspector.
+        ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
+        if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
+          return false;
+        }
+        StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
+        List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
+
+        // Tez doesn't use tagging...
+        if (reduceWork.getNeedsTagging()) {
+          return false;
+        }
+
+        // Check value ObjectInspector.
+        ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
+        if (valueObjectInspector == null ||
+                !(valueObjectInspector instanceof StructObjectInspector)) {
+          return false;
+        }
+        StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
+        List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
+
+        reduceColumnNames = new ArrayList<String>();
+        reduceTypeInfos = new ArrayList<TypeInfo>();
+
+        for (StructField field: keyFields) {
+          reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+          reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+        }
+        for (StructField field: valueFields) {
+          reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+          reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+        }
+      } catch (Exception e) {
+        throw new SemanticException(e);
+      }
+      return true;
+    }
+
+    private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
+      opRules.put(new RuleRegExp("R1", GroupByOperator.getOperatorName() + ".*"), np);
+      opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np);
+    }
+
+    private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+      LOG.info("Validating ReduceWork...");
+
+      // Validate input to ReduceWork.
+      if (!getOnlyStructObjectInspectors(reduceWork)) {
+        return false;
+      }
+      // Now check the reduce operator tree.
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      ReduceWorkValidationNodeProcessor vnp = new ReduceWorkValidationNodeProcessor();
+      addReduceWorkRules(opRules, vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+      // iterator the reduce operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.add(reduceWork.getReducer());
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+      for (Node n : nodeOutput.keySet()) {
+        if (nodeOutput.get(n) != null) {
+          if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+      LOG.info("Vectorizing ReduceWork...");
+      reduceWork.setVectorMode(true);
+
+      // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as
+      // expected.  We need to descend down, otherwise it breaks our algorithm that determines
+      // VectorizationContext...  Do we use PreOrderWalker instead of DefaultGraphWalker.
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      ReduceWorkVectorizationNodeProcessor vnp =
+              new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez);
+      addReduceWorkRules(opRules, vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new PreOrderWalker(disp);
+      // iterator the reduce operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.add(reduceWork.getReducer());
+      LOG.info("vectorizeReduceWork reducer Operator: " +
+              reduceWork.getReducer().getName() + "...");
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+
+      // Necessary since we are vectorizing the root operator in reduce.
+      reduceWork.setReducer(vnp.getRootVectorOp());
+
+      reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
+      reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
+      reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+
+      if (LOG.isDebugEnabled()) {
+        debugDisplayAllMaps(reduceWork);
+      }
+    }
+  }
+
+  class MapWorkValidationNodeProcessor implements NodeProcessor {
+
+    private final MapWork mapWork;
+    private final boolean isTez;
+
+    public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTez) {
+      this.mapWork = mapWork;
+      this.isTez = isTez;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      for (Node n : stack) {
+        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+        if (nonVectorizableChildOfGroupBy(op)) {
+          return new Boolean(true);
+        }
+        boolean ret = validateMapWorkOperator(op, mapWork, isTez);
+        if (!ret) {
+          LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
+          return new Boolean(false);
+        }
+      }
+      return new Boolean(true);
+    }
+  }
+
+  class ReduceWorkValidationNodeProcessor implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      for (Node n : stack) {
+        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+        if (nonVectorizableChildOfGroupBy(op)) {
+          return new Boolean(true);
+        }
+        boolean ret = validateReduceWorkOperator(op);
+        if (!ret) {
+          LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
+          return new Boolean(false);
+        }
+      }
+      return new Boolean(true);
+    }
+  }
+
+  // This class has common code used by both MapWorkVectorizationNodeProcessor and
+  // ReduceWorkVectorizationNodeProcessor.
+  class VectorizationNodeProcessor implements NodeProcessor {
+
+    // The vectorization context for the Map or Reduce task.
+    protected VectorizationContext taskVectorizationContext;
+
+    // The input projection column type name map for the Map or Reduce task.
+    protected Map<Integer, String> taskColumnTypeNameMap;
+
+    VectorizationNodeProcessor() {
+      taskColumnTypeNameMap = new HashMap<Integer, String>();
+    }
+
+    public Map<String, Integer> getVectorColumnNameMap() {
+      return taskVectorizationContext.getProjectionColumnMap();
+    }
+
+    public Map<Integer, String> getVectorColumnTypeMap() {
+      return taskColumnTypeNameMap;
+    }
+
+    public Map<Integer, String> getVectorScratchColumnTypeMap() {
+      return taskVectorizationContext.getScratchColumnTypeMap();
+    }
+
+    protected final Set<Operator<? extends OperatorDesc>> opsDone =
+        new HashSet<Operator<? extends OperatorDesc>>();
+
+    protected final Map<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>> opToVectorOpMap =
+        new HashMap<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>();
+
+    public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack,
+            Operator<? extends OperatorDesc> op) throws SemanticException {
+      VectorizationContext vContext = null;
+      if (stack.size() <= 1) {
+        throw new SemanticException(
+            String.format("Expected operator stack for operator %s to have at least 2 operators",
+                  op.getName()));
+      }
+      // Walk down the stack of operators until we found one willing to give us a context.
+      // At the bottom will be the root operator, guaranteed to have a context
+      int i= stack.size()-2;
+      while (vContext == null) {
+        if (i < 0) {
+          return null;
+        }
+        Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
+        Operator<? extends OperatorDesc> vectorOpParent = opToVectorOpMap.get(opParent);
+        if (vectorOpParent != null) {
+          if (vectorOpParent instanceof VectorizationContextRegion) {
+            VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOpParent;
+            vContext = vcRegion.getOuputVectorizationContext();
+            LOG.info("walkStackToFindVectorizationContext " + vectorOpParent.getName() + " has new vectorization context " + vContext.toString());
+          } else {
+            LOG.info("walkStackToFindVectorizationContext " + vectorOpParent.getName() + " does not have new vectorization context");
+          }
+        } else {
+          LOG.info("walkStackToFindVectorizationContext " + opParent.getName() + " is not vectorized");
+        }
+        --i;
+      }
+      return vContext;
+    }
+
+    public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op,
+            VectorizationContext vContext, boolean isTez) throws SemanticException {
+      Operator<? extends OperatorDesc> vectorOp = op;
+      try {
+        if (!opsDone.contains(op)) {
+          vectorOp = vectorizeOperator(op, vContext, isTez);
+          opsDone.add(op);
+          if (vectorOp != op) {
+            opToVectorOpMap.put(op, vectorOp);
+            opsDone.add(vectorOp);
+          }
+        }
+      } catch (HiveException e) {
+        throw new SemanticException(e);
+      }
+      return vectorOp;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      throw new SemanticException("Must be overridden");
+    }
+  }
+
+  class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+
+    private final boolean isTez;
+
+    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) {
+      super();
+      this.isTez = isTez;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+
+      VectorizationContext vContext = null;
+
+      if (op instanceof TableScanOperator) {
+        if (taskVectorizationContext == null) {
+          taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(),
+                  taskColumnTypeNameMap);
+        }
+        vContext = taskVectorizationContext;
+      } else {
+        LOG.info("MapWorkVectorizationNodeProcessor process going to walk the operator stack to get vectorization context for " + op.getName());
+        vContext = walkStackToFindVectorizationContext(stack, op);
+        if (vContext == null) {
+          // No operator has "pushed" a new context -- so use the task vectorization context.
+          vContext = taskVectorizationContext;
+        }
+      }
+
+      assert vContext != null;
+      LOG.info("MapWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString());
+
+      // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+      // vectorize the operators below it.
+      if (nonVectorizableChildOfGroupBy(op)) {
+        // No need to vectorize
+        if (!opsDone.contains(op)) {
+            opsDone.add(op);
+          }
+        return null;
+      }
+
+      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
+
+      if (LOG.isDebugEnabled()) {
+        if (vectorOp instanceof VectorizationContextRegion) {
+          VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+          VectorizationContext vNewContext = vcRegion.getOuputVectorizationContext();
+          LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " added vectorization context " + vNewContext.toString());
+        }
+      }
+
+      return null;
+    }
+  }
+
+  class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+
+    private final List<String> reduceColumnNames;
+    private final List<TypeInfo> reduceTypeInfos;
+
+    private final boolean isTez;
+
+    private Operator<? extends OperatorDesc> rootVectorOp;
+
+    public Operator<? extends OperatorDesc> getRootVectorOp() {
+      return rootVectorOp;
+    }
+
+    public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames,
+            List<TypeInfo> reduceTypeInfos, boolean isTez) {
+      super();
+      this.reduceColumnNames =  reduceColumnNames;
+      this.reduceTypeInfos = reduceTypeInfos;
+      rootVectorOp = null;
+      this.isTez = isTez;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+
+      VectorizationContext vContext = null;
+
+      boolean saveRootVectorOp = false;
+
+      if (op.getParentOperators().size() == 0) {
+        LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
+
+        vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames);
+        taskVectorizationContext = vContext;
+        int i = 0;
+        for (TypeInfo typeInfo : reduceTypeInfos) {
+          taskColumnTypeNameMap.put(i, typeInfo.getTypeName());
+          i++;
+        }
+        saveRootVectorOp = true;
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context " + vContext.toString());
+        }
+      } else {
+        LOG.info("ReduceWorkVectorizationNodeProcessor process going to walk the operator stack to get vectorization context for " + op.getName());
+        vContext = walkStackToFindVectorizationContext(stack, op);
+        if (vContext == null) {
+          // If we didn't find a context among the operators, assume the top -- reduce shuffle's
+          // vectorization context.
+          vContext = taskVectorizationContext;
+        }
+      }
+
+      assert vContext != null;
+      LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString());
+
+      // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+      // vectorize the operators below it.
+      if (nonVectorizableChildOfGroupBy(op)) {
+        // No need to vectorize
+        if (!opsDone.contains(op)) {
+          opsDone.add(op);
+        }
+        return null;
+      }
+
+      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
+
+      if (LOG.isDebugEnabled()) {
+        if (vectorOp instanceof VectorizationContextRegion) {
+          VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+          VectorizationContext vNewContext = vcRegion.getOuputVectorizationContext();
+          LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " added vectorization context " + vNewContext.toString());
+        }
+      }
+      if (saveRootVectorOp && op != vectorOp) {
+        rootVectorOp = vectorOp;
+      }
+
+      return null;
+    }
+  }
+
+  private static class ValidatorVectorizationContext extends VectorizationContext {
+    private ValidatorVectorizationContext() {
+      super("No Name");
+    }
+
+    @Override
+    protected int getInputColumnIndex(String name) {
+      return 0;
+    }
+
+    @Override
+    protected int getInputColumnIndex(ExprNodeColumnDesc colExpr) {
+      return 0;
+    }
+  }
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
+    hiveConf = physicalContext.getConf();
+
+    boolean vectorPath = HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+    if (!vectorPath) {
+      LOG.info("Vectorization is disabled");
+      return physicalContext;
+    }
+    // create dispatcher and graph walker
+    Dispatcher disp = new VectorizationDispatcher(physicalContext);
+    TaskGraphWalker ogw = new TaskGraphWalker(disp);
+
+    // get all the tasks nodes from root task
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(physicalContext.getRootTasks());
+
+    // begin to walk through the task tree.
+    ogw.startWalking(topNodes, null);
+    return physicalContext;
+  }
+
+  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTez) {
+    boolean ret = false;
+    switch (op.getType()) {
+      case MAPJOIN:
+        if (op instanceof MapJoinOperator) {
+          ret = validateMapJoinOperator((MapJoinOperator) op);
+        } else if (op instanceof SMBMapJoinOperator) {
+          ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+        }
+        break;
+      case GROUPBY:
+        ret = validateGroupByOperator((GroupByOperator) op, false, isTez);
+        break;
+      case FILTER:
+        ret = validateFilterOperator((FilterOperator) op);
+        break;
+      case SELECT:
+        ret = validateSelectOperator((SelectOperator) op);
+        break;
+      case REDUCESINK:
+        ret = validateReduceSinkOperator((ReduceSinkOperator) op);
+        break;
+      case TABLESCAN:
+        ret = validateTableScanOperator((TableScanOperator) op, mWork);
+        break;
+      case FILESINK:
+      case LIMIT:
+      case EVENT:
+      case SPARKPRUNINGSINK:
+        ret = true;
+        break;
+      case HASHTABLESINK:
+        ret = op instanceof SparkHashTableSinkOperator &&
+            validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
+        break;
+      default:
+        ret = false;
+        break;
+    }
+    return ret;
+  }
+
+  boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) {
+    boolean ret = false;
+    switch (op.getType()) {
+      case MAPJOIN:
+        // Does MAPJOIN actually get planned in Reduce?
+        if (op instanceof MapJoinOperator) {
+          ret = validateMapJoinOperator((MapJoinOperator) op);
+        } else if (op instanceof SMBMapJoinOperator) {
+          ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+        }
+        break;
+      case GROUPBY:
+        if (HiveConf.getBoolVar(hiveConf,
+                    HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) {
+          ret = validateGroupByOperator((GroupByOperator) op, true, true);
+        } else {
+          ret = false;
+        }
+        break;
+      case FILTER:
+        ret = validateFilterOperator((FilterOperator) op);
+        break;
+      case SELECT:
+        ret = validateSelectOperator((SelectOperator) op);
+        break;
+      case REDUCESINK:
+        ret = validateReduceSinkOperator((ReduceSinkOperator) op);
+        break;
+      case FILESINK:
+        ret = validateFileSinkOperator((FileSinkOperator) op);
+        break;
+      case LIMIT:
+      case EVENT:
+      case SPARKPRUNINGSINK:
+        ret = true;
+        break;
+      case HASHTABLESINK:
+        ret = op instanceof SparkHashTableSinkOperator &&
+            validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
+        break;
+      default:
+        ret = false;
+        break;
+    }
+    return ret;
+  }
+
+  public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
+    Operator<? extends OperatorDesc> currentOp = op;
+    while (currentOp.getParentOperators().size() > 0) {
+      currentOp = currentOp.getParentOperators().get(0);
+      if (currentOp.getType().equals(OperatorType.GROUPBY)) {
+        GroupByDesc desc = (GroupByDesc)currentOp.getConf();
+        boolean isVectorOutput = desc.getVectorDesc().isVectorOutput();
+        if (isVectorOutput) {
+          // This GROUP BY does vectorize its output.
+          return false;
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
+    SMBJoinDesc desc = op.getConf();
+    // Validation is the same as for map join, since the 'small' tables are not vectorized
+    return validateMapJoinDesc(desc);
+  }
+
+  private boolean validateTableScanOperator(TableScanOperator op, MapWork mWork) {
+    TableScanDesc desc = op.getConf();
+    if (desc.isGatherStats()) {
+      return false;
+    }
+
+    String columns = "";
+    String types = "";
+    String partitionColumns = "";
+    String partitionTypes = "";
+    boolean haveInfo = false;
+
+    // This over-reaches slightly, since we can have > 1 table-scan  per map-work.
+    // It needs path to partition, path to alias, then check the alias == the same table-scan, to be accurate.
+    // That said, that is a TODO item to be fixed when we support >1 TableScans per vectorized pipeline later.
+    LinkedHashMap<String, PartitionDesc> partitionDescs = mWork.getPathToPartitionInfo();
+
+    // For vectorization, compare each partition information for against the others.
+    // We assume the table information will be from one of the partitions, so it will
+    // work to focus on the partition information and not compare against the TableScanOperator
+    // columns (in the VectorizationContext)....
+    for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) {
+      PartitionDesc partDesc = entry.getValue();
+      if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) {
+        // No partition information -- we match because we would default to using the table description.
+        continue;
+      }
+      Properties partProps = partDesc.getProperties();
+      if (!haveInfo) {
+        columns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
+        types = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+        partitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+        partitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
+        haveInfo = true;
+      } else {
+        String nextColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
+        String nextTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+        String nextPartitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+        String nextPartitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
+        if (!columns.equalsIgnoreCase(nextColumns)) {
+          LOG.info(
+                  String.format("Could not vectorize partition %s.  Its column names %s do not match the other column names %s",
+                            entry.getKey(), nextColumns, columns));
+          return false;
+        }
+        if (!types.equalsIgnoreCase(nextTypes)) {
+          LOG.info(
+                  String.format("Could not vectorize partition %s.  Its column types %s do not match the other column types %s",
+                              entry.getKey(), nextTypes, types));
+          return false;
+        }
+        if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) {
+          LOG.info(
+                 String.format("Could not vectorize partition %s.  Its partition column names %s do not match the other partition column names %s",
+                              entry.getKey(), nextPartitionColumns, partitionColumns));
+          return false;
+        }
+        if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) {
+          LOG.info(
+                 String.format("Could not vectorize partition %s.  Its partition column types %s do not match the other partition column types %s",
+                                entry.getKey(), nextPartitionTypes, partitionTypes));
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private boolean validateMapJoinOperator(MapJoinOperator op) {
+    MapJoinDesc desc = op.getConf();
+    return validateMapJoinDesc(desc);
+  }
+
+  private boolean validateMapJoinDesc(MapJoinDesc desc) {
+    byte posBigTable = (byte) desc.getPosBigTable();
+    List<ExprNodeDesc> filterExprs = desc.getFilters().get(posBigTable);
+    if (!validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER)) {
+      LOG.info("Cannot vectorize map work filter expression");
+      return false;
+    }
+    List<ExprNodeDesc> keyExprs = desc.getKeys().get(posBigTable);
+    if (!validateExprNodeDesc(keyExprs)) {
+      LOG.info("Cannot vectorize map work key expression");
+      return false;
+    }
+    List<ExprNodeDesc> valueExprs = desc.getExprs().get(posBigTable);
+    if (!validateExprNodeDesc(valueExprs)) {
+      LOG.info("Cannot vectorize map work value expression");
+      return false;
+    }
+    return true;
+  }
+
+  private boolean validateSparkHashTableSinkOperator(SparkHashTableSinkOperator op) {
+    SparkHashTableSinkDesc desc = op.getConf();
+    byte tag = desc.getTag();
+    // it's essentially a MapJoinDesc
+    List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag);
+    List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag);
+    List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag);
+    return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) &&
+        validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs);
+  }
+
+  private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
+    List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
+    List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
+    List<ExprNodeDesc> valueDesc = op.getConf().getValueCols();
+    return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) &&
+        validateExprNodeDesc(valueDesc);
+  }
+
+  private boolean validateSelectOperator(SelectOperator op) {
+    List<ExprNodeDesc> descList = op.getConf().getColList();
+    for (ExprNodeDesc desc : descList) {
+      boolean ret = validateExprNodeDesc(desc);
+      if (!ret) {
+        LOG.info("Cannot vectorize select expression: " + desc.toString());
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean validateFilterOperator(FilterOperator op) {
+    ExprNodeDesc desc = op.getConf().getPredicate();
+    return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER);
+  }
+
+  private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) {
+    GroupByDesc desc = op.getConf();
+    VectorGroupByDesc vectorDesc = desc.getVectorDesc();
+
+    if (desc.isGroupingSetsPresent()) {
+      LOG.info("Grouping sets not supported in vector mode");
+      return false;
+    }
+    if (desc.pruneGroupingSetId()) {
+      LOG.info("Pruning grouping set id not supported in vector mode");
+      return false;
+    }
+    boolean ret = validateExprNodeDesc(desc.getKeys());
+    if (!ret) {
+      LOG.info("Cannot vectorize groupby key expression");
+      return false;
+    }
+
+    if (!isReduce) {
+
+      // MapWork
+
+      ret = validateHashAggregationDesc(desc.getAggregators());
+      if (!ret) {
+        return false;
+      }
+    } else {
+
+      // ReduceWork
+
+      boolean isComplete = desc.getMode() == GroupByDesc.Mode.COMPLETE;
+      if (desc.getMode() != GroupByDesc.Mode.HASH) {
+
+        // Reduce Merge-Partial GROUP BY.
+
+        // A merge-partial GROUP BY is fed by grouping by keys from reduce-shuffle.  It is the
+        // first (or root) operator for its reduce task.
+        // TODO: Technically, we should also handle FINAL, PARTIAL1, PARTIAL2 and PARTIALS
+        //       that are not hash or complete, but aren't merge-partial, somehow.
+
+        if (desc.isDistinct()) {
+          LOG.info("Vectorized Reduce MergePartial GROUP BY does not support DISTINCT");
+          return false;
+        }
+
+        boolean hasKeys = (desc.getKeys().size() > 0);
+
+        // Do we support merge-partial aggregation AND the output is primitive?
+        ret = validateReduceMergePartialAggregationDesc(desc.getAggregators(), hasKeys);
+        if (!ret) {
+          return false;
+        }
+
+        if (hasKeys) {
+          if (op.getParentOperators().size() > 0 && !isComplete) {
+            LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle a key group when it is fed by reduce-shuffle");
+            return false;
+          }
+
+          LOG.info("Vectorized Reduce MergePartial GROUP BY will process key groups");
+
+          // Primitive output validation above means we can output VectorizedRowBatch to the
+          // children operators.
+          vectorDesc.setVectorOutput(true);
+        } else {
+          LOG.info("Vectorized Reduce MergePartial GROUP BY will do global aggregation");
+        }
+        if (!isComplete) {
+          vectorDesc.setIsReduceMergePartial(true);
+        } else {
+          vectorDesc.setIsReduceStreaming(true);
+        }
+      } else {
+
+        // Reduce Hash GROUP BY or global aggregation.
+
+        ret = validateHashAggregationDesc(desc.getAggregators());
+        if (!ret) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  private boolean validateFileSinkOperator(FileSinkOperator op) {
+   return true;
+  }
+
+  private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
+    return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
+  }
+
+  private boolean validateExprNodeDesc(List<ExprNodeDesc> descs,
+          VectorExpressionDescriptor.Mode mode) {
+    for (ExprNodeDesc d : descs) {
+      boolean ret = validateExprNodeDesc(d, mode);
+      if (!ret) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+
+  private boolean validateHashAggregationDesc(List<AggregationDesc> descs) {
+    return validateAggregationDesc(descs, /* isReduceMergePartial */ false, false);
+  }
+
+  private boolean validateReduceMergePartialAggregationDesc(List<AggregationDesc> descs, boolean hasKeys) {
+    return validateAggregationDesc(descs, /* isReduceMergePartial */ true, hasKeys);
+  }
+
+  private boolean validateAggregationDesc(List<AggregationDesc> descs, boolean isReduceMergePartial, boolean hasKeys) {
+    for (AggregationDesc d : descs) {
+      boolean ret = validateAggregationDesc(d, isReduceMergePartial, hasKeys);
+      if (!ret) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
+    if (desc instanceof ExprNodeColumnDesc) {
+      ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc;
+      // Currently, we do not support vectorized virtual columns (see HIVE-5570).
+      if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(c.getColumn())) {
+        LOG.info("Cannot vectorize virtual column " + c.getColumn());
+        return false;
+      }
+    }
+    String typeName = desc.getTypeInfo().getTypeName();
+    boolean ret = validateDataType(typeName, mode);
+    if (!ret) {
+      LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
+      return false;
+    }
+    if (desc instanceof ExprNodeGenericFuncDesc) {
+      ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
+      boolean r = validateGenericUdf(d);
+      if (!r) {
+        LOG.info("Cannot vectorize UDF " + d);
+        return false;
+      }
+    }
+    if (desc.getChildren() != null) {
+      for (ExprNodeDesc d: desc.getChildren()) {
+        // Don't restrict child expressions for projection.  Always use looser FILTER mode.
+        boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER);
+        if (!r) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private boolean validateExprNodeDesc(ExprNodeDesc desc) {
+    return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.PROJECTION);
+  }
+
+  boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
+    if (!validateExprNodeDescRecursive(desc, mode)) {
+      return false;
+    }
+    try {
+      VectorizationContext vc = new ValidatorVectorizationContext();
+      if (vc.getVectorExpression(desc, mode) == null) {
+        // TODO: this cannot happen - VectorizationContext throws in such cases.
+        LOG.info("getVectorExpression returned null");
+        return false;
+      }
+    } catch (Exception e) {
+      LOG.info("Failed to vectorize", e);
+      return false;
+    }
+    return true;
+  }
+
+  private boolean validateGenericUdf(ExprNodeGenericFuncDesc genericUDFExpr) {
+    if (VectorizationContext.isCustomUDF(genericUDFExpr)) {
+      return true;
+    }
+    GenericUDF genericUDF = genericUDFExpr.getGenericUDF();
+    if (genericUDF instanceof GenericUDFBridge) {
+      Class<? extends UDF> udf = ((GenericUDFBridge) genericUDF).getUdfClass();
+      return supportedGenericUDFs.contains(udf);
+    } else {
+      return supportedGenericUDFs.contains(genericUDF.getClass());
+    }
+  }
+
+  private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorAggrExpr) {
+    ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
+    return (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE);
+  }
+
+  private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduceMergePartial,
+          boolean hasKeys) {
+
+    String udfName = aggDesc.getGenericUDAFName().toLowerCase();
+    if (!supportedAggregationUdfs.contains(udfName)) {
+      LOG.info("Cannot vectorize groupby aggregate expression: UDF " + udfName + " not supported");
+      return false;
+    }
+    if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) {
+      LOG.info("Cannot vectorize groupby aggregate expression: UDF parameters not supported");
+      return false;
+    }
+
+    // See if we can vectorize the aggregation.
+    VectorizationContext vc = new ValidatorVectorizationContext();
+    VectorAggregateExpression vectorAggrExpr;
+    try {
+        vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduceMergePartial);
+    } catch (Exception e) {
+      // We should have already attempted to vectorize in validateAggregationDesc.
+      LOG.info("Vectorization of aggreation should have succeeded ", e);
+      return false;
+    }
+
+    if (isReduceMergePartial && hasKeys && !validateAggregationIsPrimitive(vectorAggrExpr)) {
+      LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types");
+      return false;
+    }
+
+    return true;
+  }
+
+  private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) {
+    type = type.toLowerCase();
+    boolean result = supportedDataTypesPattern.matcher(type).matches();
+    if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) {
+      return false;
+    }
+    return result;
+  }
+
+  private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName,
+    Map<Integer, String> typeNameMap) {
+
+    VectorizationContext vContext = new VectorizationContext(contextName);
+
+    // Add all non-virtual columns to make a vectorization context for
+    // the TableScan operator.
+    int i = 0;
+    for (ColumnInfo c : rowSchema.getSignature()) {
+      // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
+      if (!isVirtualColumn(c)) {
+        vContext.addInitialColumn(c.getInternalName());
+        typeNameMap.put(i, c.getTypeName());
+        i++;
+      }
+    }
+    vContext.finishedAddingInitialColumns();
+
+    return vContext;
+  }
+
+  private void fixupParentChildOperators(Operator<? extends OperatorDesc> op,
+          Operator<? extends OperatorDesc> vectorOp) {
+    if (op.getParentOperators() != null) {
+      vectorOp.setParentOperators(op.getParentOperators());
+      for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
+        p.replaceChild(op, vectorOp);
+      }
+    }
+    if (op.getChildOperators() != null) {
+      vectorOp.setChildOperators(op.getChildOperators());
+      for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
+        c.replaceParent(op, vectorOp);
+      }
+    }
+  }
+
+  private boolean isBigTableOnlyResults(MapJoinDesc desc) {
+    Byte[] order = desc.getTagOrder();
+    byte posBigTable = (byte) desc.getPosBigTable();
+    Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
+
+    int[] smallTableIndices;
+    int smallTableIndicesSize;
+    if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) {
+      smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable);
+      LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices " + Arrays.toString(smallTableIndices));
+      smallTableIndicesSize = smallTableIndices.length;
+    } else {
+      smallTableIndices = null;
+      LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices EMPTY");
+      smallTableIndicesSize = 0;
+    }
+
+    List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable);
+    LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainList " + smallTableRetainList);
+    int smallTableRetainSize = smallTableRetainList.size();
+
+    if (smallTableIndicesSize > 0) {
+      // Small table indices has priority over retain.
+      for (int i = 0; i < smallTableIndicesSize; i++) {
+        if (smallTableIndices[i] < 0) {
+          // Negative numbers indicate a column to be (deserialize) read from the small table's
+          // LazyBinary value row.
+          LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false");
+          return false;
+        }
+      }
+    } else if (smallTableRetainSize > 0) {
+      LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false");
+      return false;
+    }
+
+    LOG.info("Vectorizer isBigTableOnlyResults returning true");
+    return true;
+  }
+
+  Operator<? extends OperatorDesc> specializeMapJoinOperator(Operator<? extends OperatorDesc> op,
+        VectorizationContext vContext, MapJoinDesc desc) throws HiveException {
+    Operator<? extends OperatorDesc> vectorOp = null;
+    Class<? extends Operator<?>> opClass = null;
+
+    VectorMapJoinDesc.HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE;
+    VectorMapJoinDesc.HashTableKind hashTableKind = HashTableKind.NONE;
+    VectorMapJoinDesc.HashTableKeyType hashTableKeyType = HashTableKeyType.NONE;
+
+    if (HiveConf.getBoolVar(hiveConf,
+              HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+      hashTableImplementationType = HashTableImplementationType.FAST;
+    } else {
+      // Restrict to using BytesBytesMultiHashMap via MapJoinBytesTableContainer or
+      // HybridHashTableContainer.
+      hashTableImplementationType = HashTableImplementationType.OPTIMIZED;
+    }
+
+    int joinType = desc.getConds()[0].getType();
+
+    boolean isInnerBigOnly = false;
+    if (joinType == JoinDesc.INNER_JOIN && isBigTableOnlyResults(desc)) {
+      isInnerBigOnly = true;
+    }
+
+    // By default, we can always use the multi-key class.
+    hashTableKeyType = HashTableKeyType.MULTI_KEY;
+
+    if (!HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MULTIKEY_ONLY_ENABLED)) {
+
+      // Look for single column optimization.
+      byte posBigTable = (byte) desc.getPosBigTable();
+      Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
+      List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
+      if (bigTableKeyExprs.size() == 1) {
+        String typeName = bigTableKeyExprs.get(0).getTypeString();
+        LOG.info("Vectorizer vectorizeOperator map join typeName " + typeName);
+        if (typeName.equals("boolean")) {
+          hashTableKeyType = HashTableKeyType.BOOLEAN;
+        } else if (typeName.equals("tinyint")) {
+          hashTableKeyType = HashTableKeyType.BYTE;
+        } else if (typeName.equals("smallint")) {
+          hashTableKeyType = HashTableKeyType.SHORT;
+        } else if (typeName.equals("int")) {
+          hashTableKeyType = HashTableKeyType.INT;
+        } else if (typeName.equals("bigint") || typeName.equals("long")) {
+          hashTableKeyType = HashTableKeyType.LONG;
+        } else if (VectorizationContext.isStringFamily(typeName)) {
+          hashTableKeyType = HashTableKeyType.STRING;
+        }
+      }
+    }
+
+    switch (joinType) {
+    case JoinDesc.INNER_JOIN:
+      if (!isInnerBigOnly) {
+        hashTableKind = HashTableKind.HASH_MAP;
+      } else {
+        hashTableKind = HashTableKind.HASH_MULTISET;
+      }
+      break;
+    case JoinDesc.LEFT_OUTER_JOIN:
+    case JoinDesc.RIGHT_OUTER_JOIN:
+      hashTableKind = HashTableKind.HASH_MAP;
+      break;
+    case JoinDesc.LEFT_SEMI_JOIN:
+      hashTableKind = HashTableKind.HASH_SET;
+      break;
+    default:
+      throw new HiveException("Unknown join type " + joinType);
+    }
+
+    LOG.info("Vectorizer vectorizeOperator map join hashTableKind " + hashTableKind.name() + " hashTableKeyType " + hashTableKeyType.name());
+
+    switch (hashTableKeyType) {
+    case BOOLEAN:
+    case BYTE:
+    case SHORT:
+    case INT:
+    case LONG:
+      switch (joinType) {
+      case JoinDesc.INNER_JOIN:
+        if (!isInnerBigOnly) {
+          opClass = VectorMapJoinInnerLongOperator.class;
+        } else {
+          opClass = VectorMapJoinInnerBigOnlyLongOperator.class;
+        }
+        break;
+      case JoinDesc.LEFT_OUTER_JOIN:
+      case JoinDesc.RIGHT_OUTER_JOIN:
+        opClass = VectorMapJoinOuterLongOperator.class;
+        break;
+      case JoinDesc.LEFT_SEMI_JOIN:
+        opClass = VectorMapJoinLeftSemiLongOperator.class;
+        break;
+      default:
+        throw new HiveException("Unknown join type " + joinType);
+      }
+      break;
+    case STRING:
+      switch (joinType) {
+      case JoinDesc.INNER_JOIN:
+        if (!isInnerBigOnly) {
+          opClass = VectorMapJoinInnerStringOperator.class;
+        } else {
+          opClass = VectorMapJoinInnerBigOnlyStringOperator.class;
+        }
+        break;
+      case JoinDesc.LEFT_OUTER_JOIN:
+      case JoinDesc.RIGHT_OUTER_JOIN:
+        opClass = VectorMapJoinOuterStringOperator.class;
+        break;
+      case JoinDesc.LEFT_SEMI_JOIN:
+        opClass = VectorMapJoinLeftSemiStringOperator.class;
+        break;
+      default:
+        throw new HiveException("Unknown join type " + joinType);
+      }
+      break;
+    case MULTI_KEY:
+      switch (joinType) {
+      case JoinDesc.INNER_JOIN:
+        if (!isInnerBigOnly) {
+          opClass = VectorMapJoinInnerMultiKeyOperator.class;
+        } else {
+          opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class;
+        }
+        break;
+      case JoinDesc.LEFT_OUTER_JOIN:
+      case JoinDesc.RIGHT_OUTER_JOIN:
+        opClass = VectorMapJoinOuterMultiKeyOperator.class;
+        break;
+      case JoinDesc.LEFT_SEMI_JOIN:
+        opClass = VectorMapJoinLeftSemiMultiKeyOperator.class;
+        break;
+      default:
+        throw new HiveException("Unknown join type " + joinType);
+      }
+      break;
+    }
+
+    vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext);
+    LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName());
+
+    boolean minMaxEnabled = HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MINMAX_ENABLED);
+
+    VectorMapJoinDesc vectorDesc = desc.getVectorDesc();
+    vectorDesc.setHashTableImplementationType(hashTableImplementationType);
+    vectorDesc.setHashTableKind(hashTableKind);
+    vectorDesc.setHashTableKeyType(hashTableKeyType);
+    vectorDesc.setMinMaxEnabled(minMaxEnabled);
+    return vectorOp;
+  }
+
+  private boolean onExpressionHasNullSafes(MapJoinDesc desc) {
+    boolean[] nullSafes = desc.getNullSafes();
+    for (boolean nullSafe : nullSafes) {
+      if (nullSafe) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean canSpecializeMapJoin(Operator<? extends OperatorDesc> op, MapJoinDesc desc,
+      boolean isTez) {
+
+    boolean specialize = false;
+
+    if (op instanceof MapJoinOperator &&
+        HiveConf.getBoolVar(hiveConf,
+            HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED)) {
+
+      // Currently, only under Tez and non-N-way joins.
+      if (isTez && desc.getConds().length == 1 && !onExpressionHasNullSafes(desc)) {
+
+        // Ok, all basic restrictions satisfied so far...
+        specialize = true;
+
+        if (!HiveConf.getBoolVar(hiveConf,
+            HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+
+          // We are using the optimized hash table we have further
+          // restrictions (using optimized and key type).
+
+          if (!HiveConf.getBoolVar(hiveConf,
+              HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE)) {
+            specialize = false;
+          } else {
+            byte posBigTable = (byte) desc.getPosBigTable();
+            Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
+            List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
+            for (ExprNodeDesc exprNodeDesc : bigTableKeyExprs) {
+              String typeName = exprNodeDesc.getTypeString();
+              if (!MapJoinKey.isSupportedField(typeName)) {
+                specialize = false;
+                break;
+              }
+            }
+          }
+        } else {
+
+          // With the fast hash table implementation, we currently do not support
+          // Hybrid Grace Hash Join.
+
+          if (HiveConf.getBoolVar(hiveConf,
+              HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)) {
+            specialize = false;
+          }
+        }
+      }
+    }
+    return specialize;
+  }
+
+  Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
+      VectorizationContext vContext, boolean isTez) throws HiveException {
+    Operator<? extends OperatorDesc> vectorOp = null;
+
+    switch (op.getType()) {
+      case MAPJOIN:
+        {
+          MapJoinDesc desc = (MapJoinDesc) op.getConf();
+          boolean specialize = canSpecializeMapJoin(op, desc, isTez);
+
+          if (!specialize) {
+
+            Class<? extends Operator<?>> opClass = null;
+            if (op instanceof MapJoinOperator) {
+
+              // *NON-NATIVE* vector map differences for LEFT OUTER JOIN and Filtered...
+
+              List<ExprNodeDesc> bigTableFilters = desc.getFilters().get((byte) desc.getPosBigTable());
+              boolean isOuterAndFiltered = (!desc.isNoOuterJoin() && bigTableFilters.size() > 0);
+              if (!isOuterAndFiltered) {
+                opClass = VectorMapJoinOperator.class;
+              } else {
+                opClass = VectorMapJoinOuterFilteredOperator.class;
+              }
+            } else if (op instanceof SMBMapJoinOperator) {
+              opClass = VectorSMBMapJoinOperator.class;
+            }
+
+            vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext);
+
+          } else {
+
+            // TEMPORARY Until Native Vector Map Join with Hybrid passes tests...
+            // HiveConf.setBoolVar(physicalContext.getConf(),
+            //    HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN, false);
+
+            vectorOp = specializeMapJoinOperator(op, vContext, desc);
+          }
+        }
+        break;
+      case GROUPBY:
+      case FILTER:
+      case SELECT:
+      case FILESINK:
+      case REDUCESINK:
+      case LIMIT:
+      case EXTRACT:
+      case EVENT:
+      case HASHTABLESINK:
+        vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
+        break;
+      default:
+        vectorOp = op;
+        break;
+    }
+
+    LOG.info("vectorizeOperator " + (vectorOp == null ? "NULL" : vectorOp.getClass().getName()));
+    LOG.info("vectorizeOperator " + (vectorOp == null || vectorOp.getConf() == null ? "NULL" : vectorOp.getConf().getClass().getName()));
+
+    if (vectorOp != op) {
+      fixupParentChildOperators(op, vectorOp);
+      ((AbstractOperatorDesc) vectorOp.getConf()).setVectorMode(true);
+    }
+    return vectorOp;
+  }
+
+  private boolean isVirtualColumn(ColumnInfo column) {
+
+    // Not using method column.getIsVirtualCol() because partitioning columns are also
+    // treated as virtual columns in ColumnInfo.
+    if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(column.getInternalName())) {
+        return true;
+    }
+    return false;
+  }
+
+  public void debugDisplayAllMaps(BaseWork work) {
+
+    Map<String, Integer> columnNameMap = work.getVectorColumnNameMap();
+    Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap();
+    Map<Integer, String> scratchColumnTypeMap = work.getVectorScratchColumnTypeMap();
+
+    LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString());
+    LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString());
+    LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.rej
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.rej b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.rej
new file mode 100644
index 0000000..5a10b58
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java.rej
@@ -0,0 +1,86 @@
+***************
+*** 1255,1272 ****
+        LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
+        return false;
+      }
+      if (desc instanceof ExprNodeGenericFuncDesc) {
+        ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
+        boolean r = validateGenericUdf(d);
+        if (!r) {
+          return false;
+        }
+      }
+      if (desc.getChildren() != null) {
+-       for (ExprNodeDesc d: desc.getChildren()) {
+-         // Don't restrict child expressions for projection.  Always use looser FILTER mode.
+-         boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER);
+-         if (!r) {
+            return false;
+          }
+        }
+--- 1265,1329 ----
+        LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
+        return false;
+      }
++     boolean isInExpression = false;
+      if (desc instanceof ExprNodeGenericFuncDesc) {
+        ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
+        boolean r = validateGenericUdf(d);
+        if (!r) {
+          return false;
+        }
++       GenericUDF genericUDF = d.getGenericUDF();
++       isInExpression = (genericUDF instanceof GenericUDFIn);
+      }
+      if (desc.getChildren() != null) {
++       if (isInExpression &&
++           desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) {
++         boolean r = validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER);
++       } else {
++         for (ExprNodeDesc d: desc.getChildren()) {
++           // Don't restrict child expressions for projection.  Always use looser FILTER mode.
++           boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER);
++           if (!r) {
++             return false;
++           }
++         }
++       }
++     }
++     return true;
++   }
++ 
++   private boolean validateStructInExpression(ExprNodeDesc desc,
++       VectorExpressionDescriptor.Mode mode) {
++ 
++     for (ExprNodeDesc d: desc.getChildren()) {
++       TypeInfo typeInfo = d.getTypeInfo();
++       if (typeInfo.getCategory() != Category.STRUCT){
++         return false;
++       }
++       StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
++ 
++       ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
++       ArrayList<String> fieldNames = structTypeInfo.getAllStructFieldNames();
++       final int fieldCount = fieldTypeInfos.size();
++       for (int f = 0; f < fieldCount; f++) {
++         TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
++         Category category = fieldTypeInfo.getCategory();
++         if (category != Category.PRIMITIVE){
++           LOG.info("Cannot vectorize struct field " + fieldNames.get(f) +
++               " of type " + fieldTypeInfo.getTypeName());
++           return false;
++         }
++         PrimitiveTypeInfo fieldPrimitiveTypeInfo = (PrimitiveTypeInfo) fieldTypeInfo;
++         InConstantType inConstantType =
++             VectorizationContext.getInConstantTypeFromPrimitiveCategory(
++                 fieldPrimitiveTypeInfo.getPrimitiveCategory());
++ 
++         // For now, limit the data types we support for Vectorized Struct IN().
++         if (inConstantType != InConstantType.INT_FAMILY &&
++             inConstantType != InConstantType.FLOAT_FAMILY &&
++             inConstantType != InConstantType.STRING_FAMILY) {
++           LOG.info("Cannot vectorize struct field " + fieldNames.get(f) +
++               " of type " + fieldTypeInfo.getTypeName());
+            return false;
+          }
+        }

http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/test/queries/clientpositive/vector_struct_in.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_struct_in.q b/ql/src/test/queries/clientpositive/vector_struct_in.q
new file mode 100644
index 0000000..0e3a4ca
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/vector_struct_in.q
@@ -0,0 +1,247 @@
+set hive.cbo.enable=false;
+set hive.tez.dynamic.partition.pruning=false;
+set hive.vectorized.execution.enabled=true;
+SET hive.auto.convert.join=true;
+
+-- SORT_QUERY_RESULTS
+
+-- 2 Strings
+create table test_1 (`id` string, `lineid` string) stored as orc;
+
+insert into table test_1 values ('one','1'), ('seven','1');
+
+explain
+select * from test_1 where struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+);
+
+select * from test_1 where struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+);
+
+explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+) as b from test_1 ;
+
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two','3'),
+struct('three','1'),
+struct('one','1'),
+struct('five','2'),
+struct('six','1'),
+struct('eight','1'),
+struct('seven','1'),
+struct('nine','1'),
+struct('ten','1')
+) as b from test_1 ;
+
+
+-- 2 Integers
+create table test_2 (`id` int, `lineid` int) stored as orc;
+
+insert into table test_2 values (1,1), (7,1);
+
+explain
+select * from test_2 where struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+);
+
+select * from test_2 where struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+);
+
+explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+) as b from test_2;
+
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct(2,3),
+struct(3,1),
+struct(1,1),
+struct(5,2),
+struct(6,1),
+struct(8,1),
+struct(7,1),
+struct(9,1),
+struct(10,1)
+) as b from test_2;
+
+-- 1 String and 1 Integer
+create table test_3 (`id` string, `lineid` int) stored as orc;
+
+insert into table test_3 values ('one',1), ('seven',1);
+
+explain
+select * from test_3 where struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+);
+
+select * from test_3 where struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+);
+
+explain
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+) as b from test_3;
+
+select `id`, `lineid`, struct(`id`, `lineid`)
+IN (
+struct('two',3),
+struct('three',1),
+struct('one',1),
+struct('five',2),
+struct('six',1),
+struct('eight',1),
+struct('seven',1),
+struct('nine',1),
+struct('ten',1)
+) as b from test_3;
+
+-- 1 Integer and 1 String and 1 Double
+create table test_4 (`my_bigint` bigint, `my_string` string, `my_double` double) stored as orc;
+
+insert into table test_4 values (1, "b", 1.5), (1, "a", 0.5), (2, "b", 1.5);
+
+explain
+select * from test_4 where struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+);
+
+select * from test_4 where struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+);
+
+explain
+select `my_bigint`, `my_string`, `my_double`, struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+) as b from test_4;
+
+select `my_bigint`, `my_string`, `my_double`, struct(`my_bigint`, `my_string`, `my_double`)
+IN (
+struct(1L, "a", 1.5),
+struct(1L, "b", -0.5),
+struct(3L, "b", 1.5),
+struct(1L, "d", 1.5),
+struct(1L, "c", 1.5),
+struct(1L, "b", 2.5),
+struct(1L, "b", 0.5),
+struct(5L, "b", 1.5),
+struct(1L, "a", 0.5),
+struct(3L, "b", 1.5)
+) as b from test_4;
\ No newline at end of file