You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/26 03:20:37 UTC

[1/3] hive git commit: HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)

Repository: hive
Updated Branches:
  refs/heads/master 262d8f992 -> 6be50b76b


http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
new file mode 100644
index 0000000..34dd487
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
@@ -0,0 +1,1976 @@
+PREHOOK: query: drop table if exists srcpart_acid
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acid
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acid (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acid
+POSTHOOK: query: CREATE TABLE srcpart_acid (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acid
+PREHOOK: query: insert into srcpart_acid PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid
+POSTHOOK: query: insert into srcpart_acid PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08	11	43	val_43
+2008-04-08	11	413	val_413
+2008-04-08	11	413	val_413
+2008-04-09	11	43	val_43
+2008-04-09	11	413	val_413
+2008-04-09	11	413	val_413
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_acid
+                  Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+                  Filter Operator
+                    predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+                    Select Operator
+                      expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                      Reduce Output Operator
+                        key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+                        sort order: +
+                        Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                        value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), '11' (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                  table:
+                      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.srcpart_acid
+                  Write Type: UPDATE
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 
+            hr 
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.srcpart_acid
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: query: update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	43	val_43updated
+2008-04-08	11	413	val_413updated
+2008-04-08	11	413	val_413updated
+2008-04-09	11	43	val_43updated
+2008-04-09	11	413	val_413updated
+2008-04-09	11	413	val_413updated
+PREHOOK: query: insert into srcpart_acid PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+PREHOOK: type: QUERY
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: query: insert into srcpart_acid PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	1001	val1001
+2008-04-08	11	1002	val1002
+2008-04-08	11	1003	val1003
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain delete from srcpart_acid where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain delete from srcpart_acid where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_acid
+                  Statistics: Num rows: 2015 Data size: 916825 Basic stats: COMPLETE Column stats: PARTIAL
+                  Filter Operator
+                    predicate: (key) IN ('1001', '213', '43') (type: boolean)
+                    Statistics: Num rows: 20 Data size: 9100 Basic stats: COMPLETE Column stats: PARTIAL
+                    Select Operator
+                      expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                      Reduce Output Operator
+                        key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+                        sort order: +
+                        Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                        value expressions: _col1 (type: string), _col2 (type: string)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string)
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                  table:
+                      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.srcpart_acid
+                  Write Type: DELETE
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 
+            hr 
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.srcpart_acid
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: delete from srcpart_acid where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: delete from srcpart_acid where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: query: select count(*) from srcpart_acid where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select count(*) from srcpart_acid
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+1990
+PREHOOK: query: merge into srcpart_acid t using (select distinct ds, hr, key, value from srcpart_acid) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@merge_tmp_table
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: merge into srcpart_acid t using (select distinct ds, hr, key, value from srcpart_acid) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acid)t.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acid)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acid)t.FieldSchema(name:hr, type:string, comment:null), ]
+PREHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated by merge'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated by merge'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	44	val_44updated by merge
+PREHOOK: query: select count(*) from srcpart_acid where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table if exists srcpart_acid
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid
+POSTHOOK: query: drop table if exists srcpart_acid
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid
+PREHOOK: query: drop table if exists srcpart_acidb
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acidb
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acidb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acidb
+POSTHOOK: query: CREATE TABLE srcpart_acidb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acidb
+PREHOOK: query: insert into srcpart_acidb PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb
+POSTHOOK: query: insert into srcpart_acidb PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08	11	43	val_43
+2008-04-08	11	413	val_413
+2008-04-08	11	413	val_413
+2008-04-09	11	43	val_43
+2008-04-09	11	413	val_413
+2008-04-09	11	413	val_413
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_acidb
+                  Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+                  Filter Operator
+                    predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+                    Select Operator
+                      expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                      Reduce Output Operator
+                        key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+                        sort order: +
+                        Map-reduce partition columns: UDFToInteger(_col0) (type: int)
+                        Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                        value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), '11' (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                  table:
+                      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.srcpart_acidb
+                  Write Type: UPDATE
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 
+            hr 
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.srcpart_acidb
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: query: update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	43	val_43updated
+2008-04-08	11	413	val_413updated
+2008-04-08	11	413	val_413updated
+2008-04-09	11	43	val_43updated
+2008-04-09	11	413	val_413updated
+2008-04-09	11	413	val_413updated
+PREHOOK: query: insert into srcpart_acidb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+PREHOOK: type: QUERY
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: query: insert into srcpart_acidb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	1001	val1001
+2008-04-08	11	1002	val1002
+2008-04-08	11	1003	val1003
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain delete from srcpart_acidb where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain delete from srcpart_acidb where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_acidb
+                  Statistics: Num rows: 2015 Data size: 916825 Basic stats: COMPLETE Column stats: PARTIAL
+                  Filter Operator
+                    predicate: (key) IN ('1001', '213', '43') (type: boolean)
+                    Statistics: Num rows: 20 Data size: 9100 Basic stats: COMPLETE Column stats: PARTIAL
+                    Select Operator
+                      expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                      Reduce Output Operator
+                        key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+                        sort order: +
+                        Map-reduce partition columns: UDFToInteger(_col0) (type: int)
+                        Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                        value expressions: _col1 (type: string), _col2 (type: string)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string)
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                  table:
+                      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.srcpart_acidb
+                  Write Type: DELETE
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 
+            hr 
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.srcpart_acidb
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: delete from srcpart_acidb where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: delete from srcpart_acidb where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: query: select count(*) from srcpart_acidb where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select count(*) from srcpart_acidb
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+1990
+PREHOOK: query: merge into srcpart_acidb t using (select distinct ds, hr, key, value from srcpart_acidb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@merge_tmp_table
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: merge into srcpart_acidb t using (select distinct ds, hr, key, value from srcpart_acidb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidb)t.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidb)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidb)t.FieldSchema(name:hr, type:string, comment:null), ]
+PREHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated by merge'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated by merge'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	44	val_44updated by merge
+PREHOOK: query: select count(*) from srcpart_acidb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table if exists srcpart_acidb
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb
+POSTHOOK: query: drop table if exists srcpart_acidb
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb
+PREHOOK: query: drop table if exists srcpart_acidv
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acidv
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acidv (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acidv
+POSTHOOK: query: CREATE TABLE srcpart_acidv (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acidv
+PREHOOK: query: insert into srcpart_acidv PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv
+POSTHOOK: query: insert into srcpart_acidv PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08	11	43	val_43
+2008-04-08	11	413	val_413
+2008-04-08	11	413	val_413
+2008-04-09	11	43	val_43
+2008-04-09	11	413	val_413
+2008-04-09	11	413	val_413
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_acidv
+                  Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+                  Filter Operator
+                    predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+                    Select Operator
+                      expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                      Reduce Output Operator
+                        key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+                        sort order: +
+                        Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                        value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), '11' (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+                  table:
+                      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.srcpart_acidv
+                  Write Type: UPDATE
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 
+            hr 
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.srcpart_acidv
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: query: update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	43	val_43updated
+2008-04-08	11	413	val_413updated
+2008-04-08	11	413	val_413updated
+2008-04-09	11	43	val_43updated
+2008-04-09	11	413	val_413updated
+2008-04-09	11	413	val_413updated
+PREHOOK: query: insert into srcpart_acidv PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+PREHOOK: type: QUERY
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: query: insert into srcpart_acidv PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	1001	val1001
+2008-04-08	11	1002	val1002
+2008-04-08	11	1003	val1003
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain delete from srcpart_acidv where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain delete from srcpart_acidv where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_acidv
+                  Statistics: Num rows: 2015 Data size: 916825 Basic stats: COMPLETE Column stats: PARTIAL
+                  Filter Operator
+                    predicate: (key) IN ('1001', '213', '43') (type: boolean)
+                    Statistics: Num rows: 20 Data size: 9100 Basic stats: COMPLETE Column stats: PARTIAL
+                    Select Operator
+                      expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                      Reduce Output Operator
+                        key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+                        sort order: +
+                        Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                        value expressions: _col1 (type: string), _col2 (type: string)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string)
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+                  table:
+                      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.srcpart_acidv
+                  Write Type: DELETE
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 
+            hr 
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.srcpart_acidv
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: delete from srcpart_acidv where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: delete from srcpart_acidv where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: query: select count(*) from srcpart_acidv where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select count(*) from srcpart_acidv
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+1990
+PREHOOK: query: merge into srcpart_acidv t using (select distinct ds, hr, key, value from srcpart_acidv) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@merge_tmp_table
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: merge into srcpart_acidv t using (select distinct ds, hr, key, value from srcpart_acidv) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidv)t.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidv)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidv)t.FieldSchema(name:hr, type:string, comment:null), ]
+PREHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated by merge'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated by merge'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08	11	44	val_44updated by merge
+PREHOOK: query: select count(*) from srcpart_acidv where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table if exists srcpart_acidv
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv
+POSTHOOK: query: drop table if exists srcpart_acidv
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv
+PREHOOK: query: drop table if exists srcpart_acidvb
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acidvb
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acidvb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acidvb
+POSTHOOK: query: CREATE TABLE srcpart_acidvb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acidvb
+PREHOOK: query: insert into srcpart_acidvb PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidvb
+POSTHOOK: query: insert into srcpart_acidvb PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidvb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidvb
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidvb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidvb
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08	11	43	val_43
+2008-04-08	11	413	val_413
+2008-04-08	11	413	val_413
+2008-04-09	11	43	val_43
+2008-04-09	11	413	val_413
+2008-04-09	11	413	val_413
+PREHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidvb
+PREHOOK: Output: default@srcpart_acidvb
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidvb
+POSTHOOK: Output: default@srcpart_acidvb
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidvb
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidvb
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidvb
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidvb
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_acidvb
+                  Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+                  Filter Operator
+                    predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+                    Select Operator
+                      expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 500 D

<TRUNCATED>

[2/3] hive git commit: HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)

Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index d61b24b..37aaeb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -50,6 +50,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
   private boolean hasFooter;
   private boolean isOriginal;
   private boolean hasBase;
+  //partition root
+  private Path rootDir;
   private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
   private long projColsUncompressedSize;
   private transient Object fileKey;
@@ -70,7 +72,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
 
   public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts,
       OrcTail orcTail, boolean isOriginal, boolean hasBase,
-      List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen) {
+      List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen, Path rootDir) {
     super(path, offset, length, hosts);
     // For HDFS, we could avoid serializing file ID and just replace the path with inode-based
     // path. However, that breaks bunch of stuff because Hive later looks up things by split path.
@@ -79,6 +81,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     hasFooter = this.orcTail != null;
     this.isOriginal = isOriginal;
     this.hasBase = hasBase;
+    this.rootDir = rootDir;
     this.deltas.addAll(deltas);
     this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize;
     // setting file length to Long.MAX_VALUE will let orc reader read file length from file system
@@ -129,6 +132,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
       ((Writable)fileKey).write(out);
     }
     out.writeLong(fileLen);
+    out.writeUTF(rootDir.toString());
   }
 
   @Override
@@ -168,6 +172,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
       this.fileKey = fileId;
     }
     fileLen = in.readLong();
+    rootDir = new Path(in.readUTF());
   }
 
   public OrcTail getOrcTail() {
@@ -186,6 +191,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     return hasBase;
   }
 
+  public Path getRootDir() {
+    return rootDir;
+  }
   public List<AcidInputFormat.DeltaMetaData> getDeltas() {
     return deltas;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 8f80710..138e56e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
@@ -357,7 +358,7 @@ public class VectorizedOrcAcidRowBatchReader
           int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
           String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
           this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
-          OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
+          OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true);
           assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
           this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
                                                       validTxnList, readerOptions, deleteDeltas,
@@ -530,6 +531,9 @@ public class VectorizedOrcAcidRowBatchReader
      * For every call to next(), it returns the next smallest record id in the file if available.
      * Internally, the next() buffers a row batch and maintains an index pointer, reading the
      * next batch when the previous batch is exhausted.
+     *
+     * For unbucketed tables this will currently return all delete events.  Once we trust that
+     * the N in bucketN for "base" spit is reliable, all delete events not matching N can be skipped.
      */
     static class DeleteReaderValue {
       private VectorizedRowBatch batch;
@@ -538,9 +542,10 @@ public class VectorizedOrcAcidRowBatchReader
       private final int bucketForSplit; // The bucket value should be same for all the records.
       private final ValidTxnList validTxnList;
       private boolean isBucketPropertyRepeating;
+      private final boolean isBucketedTable;
 
       public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
-          ValidTxnList validTxnList) throws IOException {
+          ValidTxnList validTxnList, boolean isBucketedTable) throws IOException {
         this.recordReader  = deleteDeltaReader.rowsOptions(readerOptions);
         this.bucketForSplit = bucket;
         this.batch = deleteDeltaReader.getSchema().createRowBatch();
@@ -549,6 +554,7 @@ public class VectorizedOrcAcidRowBatchReader
         }
         this.indexPtrInBatch = 0;
         this.validTxnList = validTxnList;
+        this.isBucketedTable = isBucketedTable;
         checkBucketId();//check 1st batch
       }
 
@@ -615,6 +621,13 @@ public class VectorizedOrcAcidRowBatchReader
        * either the split computation got messed up or we found some corrupted records.
        */
       private void checkBucketId(int bucketPropertyFromRecord) throws IOException {
+        if(!isBucketedTable) {
+          /**
+           * in this case a file inside a delete_delta_x_y/bucketN may contain any value for
+           * bucketId in {@link RecordIdentifier#getBucketProperty()}
+           */
+          return;
+        }
         int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord)
           .decodeWriterId(bucketPropertyFromRecord);
         if(bucketIdFromRecord != bucketForSplit) {
@@ -686,14 +699,16 @@ public class VectorizedOrcAcidRowBatchReader
       this.rowIds = null;
       this.compressedOtids = null;
       int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY);
+      final boolean isBucketedTable  = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
 
       try {
         final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
         if (deleteDeltaDirs.length > 0) {
           int totalDeleteEventCount = 0;
           for (Path deleteDeltaDir : deleteDeltaDirs) {
-            Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket);
-            FileSystem fs = deleteDeltaFile.getFileSystem(conf);
+            FileSystem fs = deleteDeltaDir.getFileSystem(conf);
+            for(Path deleteDeltaFile : OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, conf,
+              new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable)) {
             // NOTE: Calling last flush length below is more for future-proofing when we have
             // streaming deletes. But currently we don't support streaming deletes, and this can
             // be removed if this becomes a performance issue.
@@ -721,7 +736,7 @@ public class VectorizedOrcAcidRowBatchReader
                 throw new DeleteEventsOverflowMemoryException();
               }
               DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
-                  readerOptions, bucket, validTxnList);
+                  readerOptions, bucket, validTxnList, isBucketedTable);
               DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
               if (deleteReaderValue.next(deleteRecordKey)) {
                 sortMerger.put(deleteRecordKey, deleteReaderValue);
@@ -730,6 +745,7 @@ public class VectorizedOrcAcidRowBatchReader
               }
             }
           }
+          }
           if (totalDeleteEventCount > 0) {
             // Initialize the rowId array when we have some delete events.
             rowIds = new long[totalDeleteEventCount];

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 76aa39f..4b11a4a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -206,6 +206,7 @@ public class SortedDynPartitionOptimizer extends Transform {
         if(!VirtualColumn.ROWID.getTypeInfo().equals(ci.getType())) {
           throw new IllegalStateException("expected 1st column to be ROW__ID but got wrong type: " + ci.toString());
         }
+        //HIVE-17328: not sure this is correct... I don't think is gets wrapped in UDFToInteger....
         bucketColumns.add(new ExprNodeColumnDesc(ci));
       } else {
         if (!destTable.getSortCols().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index bc6e0d5..e8acabe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7377,9 +7377,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   // Check constraints on acid tables.  This includes
-  // * no insert overwrites
-  // * no use of vectorization
-  // * turns off reduce deduplication optimization, as that sometimes breaks acid
   // * Check that the table is bucketed
   // * Check that the table is not sorted
   // This method assumes you have already decided that this is an Acid write.  Don't call it if
@@ -7397,9 +7394,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     */
     conf.set(AcidUtils.CONF_ACID_KEY, "true");
 
-    if (table.getNumBuckets() < 1) {
-      throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, table.getTableName());
-    }
     if (table.getSortCols() != null && table.getSortCols().size() > 0) {
       throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, table.getTableName());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 5e2146e..04ef7fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -93,7 +94,7 @@ public class CompactorMR {
   static final private String IS_MAJOR = "hive.compactor.is.major";
   static final private String IS_COMPRESSED = "hive.compactor.is.compressed";
   static final private String TABLE_PROPS = "hive.compactor.table.props";
-  static final private String NUM_BUCKETS = "hive.compactor.num.buckets";
+  static final private String NUM_BUCKETS = hive_metastoreConstants.BUCKET_COUNT;
   static final private String BASE_DIR = "hive.compactor.base.dir";
   static final private String DELTA_DIRS = "hive.compactor.delta.dirs";
   static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search";

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index bff9884..0f129fc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -69,68 +69,17 @@ import java.util.concurrent.TimeUnit;
  * Tests here are for multi-statement transactions (WIP) and those that don't need to
  * run with Acid 2.0 (see subclasses of TestTxnCommands2)
  */
-public class TestTxnCommands {
+public class TestTxnCommands extends TestTxnCommandsBase {
   static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
   private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
     File.separator + TestTxnCommands.class.getCanonicalName()
     + "-" + System.currentTimeMillis()
   ).getPath().replaceAll("\\\\", "/");
-  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
-  //bucket count for test tables; set it to 1 for easier debugging
-  private static int BUCKET_COUNT = 2;
-  @Rule
-  public TestName testName = new TestName();
-  private HiveConf hiveConf;
-  private Driver d;
-  private static enum Table {
-    ACIDTBL("acidTbl"),
-    ACIDTBLPART("acidTblPart"),
-    ACIDTBL2("acidTbl2"),
-    NONACIDORCTBL("nonAcidOrcTbl"),
-    NONACIDORCTBL2("nonAcidOrcTbl2");
-
-    private final String name;
-    @Override
-    public String toString() {
-      return name;
-    }
-    Table(String name) {
-      this.name = name;
-    }
+  @Override
+  String getTestDataDir() {
+    return TEST_DATA_DIR;
   }
 
-  @Before
-  public void setUp() throws Exception {
-    tearDown();
-    hiveConf = new HiveConf(this.getClass());
-    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
-    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
-    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
-    hiveConf
-    .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-        "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-    hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
-    TxnDbUtil.setConfValues(hiveConf);
-    TxnDbUtil.prepDb();
-    File f = new File(TEST_WAREHOUSE_DIR);
-    if (f.exists()) {
-      FileUtil.fullyDelete(f);
-    }
-    if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
-      throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
-    }
-    SessionState.start(new SessionState(hiveConf));
-    d = new Driver(hiveConf);
-    d.setMaxRows(10000);
-    dropTables();
-    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
-    runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
-    runStatementOnDriver("create temporary  table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-  }
   private void dropTables() throws Exception {
     for(Table t : Table.values()) {
       runStatementOnDriver("drop table if exists " + t);
@@ -150,7 +99,7 @@ public class TestTxnCommands {
       FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
     }
   }
-  @Test
+  @Test//todo: what is this for?
   public void testInsertOverwrite() throws Exception {
     runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2);
     runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
@@ -172,7 +121,7 @@ public class TestTxnCommands {
     if(true) {
       return;
     }
-    Path bucket = AcidUtils.createBucketFile(new Path(new Path(TEST_WAREHOUSE_DIR, table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum);
+    Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum);
     FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" +  bucket.getName());
 //    try {
 //      FileDump.printJsonData(hiveConf, bucket.toString(), delta);
@@ -446,7 +395,7 @@ public class TestTxnCommands {
       }
     }
     Assert.assertNotNull(txnInfo);
-    Assert.assertEquals(12, txnInfo.getId());
+    Assert.assertEquals(14, txnInfo.getId());
     Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
     String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
     String[] vals = s.split("\\s+");
@@ -490,33 +439,6 @@ public class TestTxnCommands {
     }
   }
 
-  /**
-   * takes raw data and turns it into a string as if from Driver.getResults()
-   * sorts rows in dictionary order
-   */
-  private List<String> stringifyValues(int[][] rowsIn) {
-    return TestTxnCommands2.stringifyValues(rowsIn);
-  }
-  private String makeValuesClause(int[][] rows) {
-    return TestTxnCommands2.makeValuesClause(rows);
-  }
-
-  private List<String> runStatementOnDriver(String stmt) throws Exception {
-    CommandProcessorResponse cpr = d.run(stmt);
-    if(cpr.getResponseCode() != 0) {
-      throw new RuntimeException(stmt + " failed: " + cpr);
-    }
-    List<String> rs = new ArrayList<String>();
-    d.getResults(rs);
-    return rs;
-  }
-  private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception {
-    CommandProcessorResponse cpr = d.run(stmt);
-    if(cpr.getResponseCode() != 0) {
-      return cpr;
-    }
-    throw new RuntimeException("Didn't get expected failure!");
-  }
 
   @Test
   public void exchangePartition() throws Exception {
@@ -872,8 +794,8 @@ public class TestTxnCommands {
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000016_0000016_0000/bucket_00001"));
     //run Compaction
     runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
@@ -884,13 +806,13 @@ public class TestTxnCommands {
     }
     Assert.assertEquals("", 4, rs.size());
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000016/bucket_00000"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
 
     //make sure they are the same before and after compaction
   }
@@ -940,4 +862,4 @@ public class TestTxnCommands {
     int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}};
     Assert.assertEquals(stringifyValues(expected), r);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 0e0fca3..21b4a2c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -304,8 +304,8 @@ public class TestTxnCommands2 {
     // 1. Insert five rows to Non-ACID table.
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(3,4),(5,6),(7,8),(9,10)");
 
-    // 2. Convert NONACIDORCTBL to ACID table.
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+    // 2. Convert NONACIDORCTBL to ACID table.  //todo: remove trans_prop after HIVE-17089
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
     runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = b*2 where b in (4,10)");
     runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 7");
 
@@ -331,6 +331,7 @@ public class TestTxnCommands2 {
   /**
    * see HIVE-16177
    * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()}
+   * {@link TestTxnNoBuckets#testCTAS()}
    */
   @Test
   public void testNonAcidToAcidConversion02() throws Exception {
@@ -341,8 +342,8 @@ public class TestTxnCommands2 {
     //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?)
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)");
 
-    //convert the table to Acid
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+    //convert the table to Acid  //todo: remove trans_prop after HIVE-17089
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
     List<String> rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL);
     //create a some of delta directories
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)");
@@ -361,6 +362,7 @@ public class TestTxnCommands2 {
      * All ROW__IDs are unique on read after conversion to acid 
      * ROW__IDs are exactly the same before and after compaction
      * Also check the file name (only) after compaction for completeness
+     * Note: order of rows in a file ends up being the reverse of order in values clause (why?!)
      */
     String[][] expected = {
       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13",  "bucket_00000"},
@@ -2022,7 +2024,7 @@ public class TestTxnCommands2 {
   }
   static String makeValuesClause(int[][] rows) {
     assert rows.length > 0;
-    StringBuilder sb = new StringBuilder("values");
+    StringBuilder sb = new StringBuilder(" values");
     for(int[] row : rows) {
       assert row.length > 0;
       if(row.length > 1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java
new file mode 100644
index 0000000..d6e709d
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java
@@ -0,0 +1,162 @@
+package org.apache.hadoop.hive.ql;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class TestTxnCommandsBase {
+  //bucket count for test tables; set it to 1 for easier debugging
+  final static int BUCKET_COUNT = 2;
+  @Rule
+  public TestName testName = new TestName();
+  HiveConf hiveConf;
+  Driver d;
+  enum Table {
+    ACIDTBL("acidTbl"),
+    ACIDTBLPART("acidTblPart"),
+    ACIDTBL2("acidTbl2"),
+    NONACIDORCTBL("nonAcidOrcTbl"),
+    NONACIDORCTBL2("nonAcidOrcTbl2"),
+    NONACIDNONBUCKET("nonAcidNonBucket");
+
+    final String name;
+    @Override
+    public String toString() {
+      return name;
+    }
+    Table(String name) {
+      this.name = name;
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    setUpInternal();
+  }
+  void setUpInternal() throws Exception {
+    tearDown();
+    hiveConf = new HiveConf(this.getClass());
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
+    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
+    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+    hiveConf
+      .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+        "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+    TxnDbUtil.setConfValues(hiveConf);
+    TxnDbUtil.prepDb();
+    File f = new File(getWarehouseDir());
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+    if (!(new File(getWarehouseDir()).mkdirs())) {
+      throw new RuntimeException("Could not create " + getWarehouseDir());
+    }
+    SessionState.start(new SessionState(hiveConf));
+    d = new Driver(hiveConf);
+    d.setMaxRows(10000);
+    dropTables();
+    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+    runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+    runStatementOnDriver("create temporary  table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc");
+  }
+  private void dropTables() throws Exception {
+    for(TestTxnCommandsBase.Table t : TestTxnCommandsBase.Table.values()) {
+      runStatementOnDriver("drop table if exists " + t);
+    }
+  }
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (d != null) {
+        dropTables();
+        d.destroy();
+        d.close();
+        d = null;
+      }
+    } finally {
+      TxnDbUtil.cleanDb();
+      FileUtils.deleteDirectory(new File(getTestDataDir()));
+    }
+  }
+  String getWarehouseDir() {
+    return getTestDataDir() + "/warehouse";
+  }
+  abstract String getTestDataDir();
+  /**
+   * takes raw data and turns it into a string as if from Driver.getResults()
+   * sorts rows in dictionary order
+   */
+  List<String> stringifyValues(int[][] rowsIn) {
+    return TestTxnCommands2.stringifyValues(rowsIn);
+  }
+  String makeValuesClause(int[][] rows) {
+    return TestTxnCommands2.makeValuesClause(rows);
+  }
+
+  List<String> runStatementOnDriver(String stmt) throws Exception {
+    CommandProcessorResponse cpr = d.run(stmt);
+    if(cpr.getResponseCode() != 0) {
+      throw new RuntimeException(stmt + " failed: " + cpr);
+    }
+    List<String> rs = new ArrayList<String>();
+    d.getResults(rs);
+    return rs;
+  }
+  CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception {
+    CommandProcessorResponse cpr = d.run(stmt);
+    if(cpr.getResponseCode() != 0) {
+      return cpr;
+    }
+    throw new RuntimeException("Didn't get expected failure!");
+  }
+  /**
+   * Will assert that actual files match expected.
+   * @param expectedFiles - suffixes of expected Paths.  Must be the same length
+   * @param rootPath - table or patition root where to start looking for actual files, recursively
+   */
+  void assertExpectedFileSet(Set<String> expectedFiles, String rootPath) throws Exception {
+    int suffixLength = 0;
+    for(String s : expectedFiles) {
+      if(suffixLength > 0) {
+        assert suffixLength == s.length() : "all entries must be the same length. current: " + s;
+      }
+      suffixLength = s.length();
+    }
+    FileSystem fs = FileSystem.get(hiveConf);
+    Set<String> actualFiles = new HashSet<>();
+    RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(rootPath), true);
+    while (remoteIterator.hasNext()) {
+      LocatedFileStatus lfs = remoteIterator.next();
+      if(!lfs.isDirectory() && org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER.accept(lfs.getPath())) {
+        String p = lfs.getPath().toString();
+        actualFiles.add(p.substring(p.length() - suffixLength, p.length()));
+      }
+    }
+    Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
new file mode 100644
index 0000000..7aca6b2
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -0,0 +1,297 @@
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class TestTxnNoBuckets extends TestTxnCommandsBase {
+  static final private Logger LOG = LoggerFactory.getLogger(TestTxnNoBuckets.class);
+  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+    File.separator + TestTxnNoBuckets.class.getCanonicalName()
+    + "-" + System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+  @Override
+  String getTestDataDir() {
+    return TEST_DATA_DIR;
+  }
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    setUpInternal();
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+  }
+
+  /**
+   * Tests that Acid can work with un-bucketed tables.
+   */
+  @Test
+  public void testNoBuckets() throws Exception {
+    int[][] sourceVals1 = {{0,0,0},{3,3,3}};
+    int[][] sourceVals2 = {{1,1,1},{2,2,2}};
+    runStatementOnDriver("drop table if exists tmp");
+    runStatementOnDriver("create table tmp (c1 integer, c2 integer, c3 integer) stored as orc");
+    runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals1));
+    runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2));
+    runStatementOnDriver("drop table if exists nobuckets");
+    runStatementOnDriver("create table nobuckets (c1 integer, c2 integer, c3 integer) stored " +
+      "as orc tblproperties('transactional'='true', 'transactional_properties'='default')");
+    String stmt = "insert into nobuckets select * from tmp";
+    runStatementOnDriver(stmt);
+    List<String> rs = runStatementOnDriver(
+      "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by ROW__ID");
+    Assert.assertEquals("", 4, rs.size());
+    LOG.warn("after insert");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    /**the insert creates 2 output files (presumably because there are 2 input files)
+     * The number in the file name is writerId.  This is the number encoded in ROW__ID.bucketId -
+     * see {@link org.apache.hadoop.hive.ql.io.BucketCodec}*/
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t0\t"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
+    /*todo: WTF?
+    RS for update seems to spray randomly... is that OK?  maybe as long as all resultant files have different names... will they?
+    Assuming we name them based on taskId, we should create bucketX and bucketY.
+    we delete events can be written to bucketX file it could be useful for filter delete for a split by file name since the insert
+    events seem to be written to a proper bucketX file.  In fact this may reduce the number of changes elsewhere like compactor... maybe
+    But this limits the parallelism - what is worse, you don't know what the parallelism should be until you have a list of all the
+    input files since bucket count is no longer a metadata property.  Also, with late Update split, the file name has already been determined
+    from taskId so the Insert part won't end up matching the bucketX property necessarily.
+    With early Update split, the Insert can still be an insert - i.e. go to appropriate bucketX.  But deletes will still go wherever (random shuffle)
+    unless you know all the bucketX files to be read - may not be worth the trouble.
+    * 2nd: something in FS fails.  ArrayIndexOutOfBoundsException: 1 at FileSinkOperator.process(FileSinkOperator.java:779)*/
+    runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)");
+    rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+    LOG.warn("after update");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
+    //so update has 1 writer which creates bucket0 where both new rows land
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000021_0000021_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000021_0000021_0000/bucket_00000"));
+
+    Set<String> expectedFiles = new HashSet<>();
+    //both delete events land in a single bucket0.  Each has a different ROW__ID.bucketId value (even writerId in it is different)
+    expectedFiles.add("ts/delete_delta_0000021_0000021_0000/bucket_00000");
+    expectedFiles.add("nobuckets/delta_0000019_0000019_0000/bucket_00000");
+    expectedFiles.add("nobuckets/delta_0000019_0000019_0000/bucket_00001");
+    expectedFiles.add("nobuckets/delta_0000021_0000021_0000/bucket_00000");
+    //check that we get the right files on disk
+    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+    //todo: it would be nice to check the contents of the files... could use orc.FileDump - it has
+    // methods to print to a supplied stream but those are package private
+
+    runStatementOnDriver("alter table nobuckets compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+    LOG.warn("after major compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    /*
+├── base_0000021
+│   ├── bucket_00000
+│   └── bucket_00001
+├── delete_delta_0000021_0000021_0000
+│   └── bucket_00000
+├── delta_0000019_0000019_0000
+│   ├── bucket_00000
+│   └── bucket_00001
+└── delta_0000021_0000021_0000
+    └── bucket_00000
+    */
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/base_0000021/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/base_0000021/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000021/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/base_0000021/bucket_00001"));
+
+    expectedFiles.clear();
+    expectedFiles.add("delete_delta_0000021_0000021_0000/bucket_00000");
+    expectedFiles.add("uckets/delta_0000019_0000019_0000/bucket_00000");
+    expectedFiles.add("uckets/delta_0000019_0000019_0000/bucket_00001");
+    expectedFiles.add("uckets/delta_0000021_0000021_0000/bucket_00000");
+    expectedFiles.add("/warehouse/nobuckets/base_0000021/bucket_00000");
+    expectedFiles.add("/warehouse/nobuckets/base_0000021/bucket_00001");
+    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+
+    TestTxnCommands2.runCleaner(hiveConf);
+    rs = runStatementOnDriver("select c1, c2, c3 from nobuckets order by c1, c2, c3");
+    int[][] result = {{0,0,17},{1,1,17},{2,2,2},{3,3,3}};
+    Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs);
+
+    expectedFiles.clear();
+    expectedFiles.add("nobuckets/base_0000021/bucket_00000");
+    expectedFiles.add("nobuckets/base_0000021/bucket_00001");
+    assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+  }
+
+  /**
+   * all of these pass but don't do exactly the right thing
+   * files land as if it's not an acid table "warehouse/myctas4/000000_0"
+   * even though in {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires
+   * and sees it as transactional table
+   * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer
+   *
+   * On read, these files are treated like non acid to acid conversion
+   *
+   * see HIVE-15899
+   * See CTAS tests in TestAcidOnTez
+   */
+  @Test
+  public void testCTAS() throws Exception {
+    int[][] values = {{1,2},{3,4}};
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL +  makeValuesClause(values));
+    runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" +
+      "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL);
+    List<String> rs = runStatementOnDriver("select * from myctas order by a, b");
+    Assert.assertEquals(stringifyValues(values), rs);
+
+    runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values));
+    runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" +
+      "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select * from myctas2 order by a, b");
+    Assert.assertEquals(stringifyValues(values), rs);
+
+    runStatementOnDriver("create table myctas3 stored as ORC TBLPROPERTIES ('transactional" +
+      "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL +
+      " union all select a, b from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select * from myctas3 order by a, b");
+    Assert.assertEquals(stringifyValues(new int[][] {{1,2},{1,2},{3,4},{3,4}}), rs);
+
+    runStatementOnDriver("create table myctas4 stored as ORC TBLPROPERTIES ('transactional" +
+      "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL +
+      " union distinct select a, b from " + Table.ACIDTBL);
+    rs = runStatementOnDriver("select * from myctas4 order by a, b");
+    Assert.assertEquals(stringifyValues(values), rs);
+  }
+  /**
+   * see HIVE-16177
+   * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()}  todo need test with > 1 bucket file
+   */
+  @Test
+  public void testToAcidConversion02() throws Exception {
+    //create 2 rows in a file 00000_0
+    runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)");
+    //create 4 rows in a file 000000_0_copy_1
+    runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,12),(0,13),(1,4),(1,5)");
+    //create 1 row in a file 000000_0_copy_2
+    runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,6)");
+
+    //convert the table to Acid  //todo: remove trans_prop after HIVE-17089
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " +  Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("before acid ops (after convert)");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    //create a some of delta directories
+    runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,15),(1,16)");
+    runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set b = 120 where a = 0 and b = 12");
+    runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,17)");
+    runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 1 and b = 3");
+
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " +  Table.NONACIDNONBUCKET + " order by a,b");
+    LOG.warn("before compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    /*
+     * All ROW__IDs are unique on read after conversion to acid
+     * ROW__IDs are exactly the same before and after compaction
+     * Also check the file name (only) after compaction for completeness
+     */
+    String[][] expected = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13",  "bucket_00000", "000000_0_copy_1"},
+      {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000", "bucket_00000"},
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000", "bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000", "bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2",   "bucket_00000", "000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t1\t4",   "bucket_00000", "000000_0_copy_1"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t1\t5",   "bucket_00000", "000000_0_copy_1"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6",   "bucket_00000", "000000_0_copy_2"},
+      {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\t1\t16", "bucket_00000", "bucket_00000"}
+    };
+    Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][2]));
+    }
+    //run Compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDNONBUCKET +" compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    /*
+    nonacidnonbucket/
+    ├── 000000_0
+    ├── 000000_0_copy_1
+    ├── 000000_0_copy_2
+    ├── base_0000021
+    │   └── bucket_00000
+    ├── delete_delta_0000019_0000019_0000
+    │   └── bucket_00000
+    ├── delete_delta_0000021_0000021_0000
+    │   └── bucket_00000
+    ├── delta_0000018_0000018_0000
+    │   └── bucket_00000
+    ├── delta_0000019_0000019_0000
+    │   └── bucket_00000
+    └── delta_0000020_0000020_0000
+        └── bucket_00000
+
+    6 directories, 9 files
+    */
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a,b");
+    LOG.warn("after compact");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+    //make sure they are the same before and after compaction
+  }
+  /**
+   * Currently CTAS doesn't support bucketed tables.  Correspondingly Acid only supports CTAS for
+   * unbucketed tables.  This test is here to make sure that if CTAS is made to support unbucketed
+   * tables, that it raises a red flag for Acid.
+   */
+  @Test
+  public void testCtasBucketed() throws Exception {
+    runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)");
+    CommandProcessorResponse cpr = runStatementOnDriverNegative("create table myctas " +
+      "clustered by (a) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') as " +
+      "select a, b from " + Table.NONACIDORCTBL);
+    int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate
+//    Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode());
+    Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support"));
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index f73d058..4c30732 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -3420,6 +3420,7 @@ public class TestInputOutputFormat {
       assertTrue(split.toString().contains("hasFooter=false"));
       assertTrue(split.toString().contains("hasBase=true"));
       assertTrue(split.toString().contains("deltas=0"));
+      assertTrue(split.toString().contains("isOriginal=true"));
       if (split instanceof OrcSplit) {
         assertFalse("No footer serialize test for non-vector reader, hasFooter is not expected in" +
             " orc splits.", ((OrcSplit) split).hasFooter());
@@ -3435,11 +3436,13 @@ public class TestInputOutputFormat {
     }
     // call-1: open to read footer - split 1 => mock:/mocktable5/0_0
     // call-2: open to read data - split 1 => mock:/mocktable5/0_0
-    // call-3: open to read footer - split 2 => mock:/mocktable5/0_1
-    // call-4: open to read data - split 2 => mock:/mocktable5/0_1
-    // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_0
-    // call-6: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_1
-    assertEquals(6, readOpsDelta);
+    // call-3: getAcidState - split 1 => mock:/mocktable5 (to compute offset for original read)
+    // call-4: open to read footer - split 2 => mock:/mocktable5/0_1
+    // call-5: open to read data - split 2 => mock:/mocktable5/0_1
+    // call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read)
+    // call-7: open to read footer - split 2 => mock:/mocktable5/0_0 (to get row count)
+    // call-8: file status - split 2 => mock:/mocktable5/0_0
+    assertEquals(8, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3498,6 +3501,7 @@ public class TestInputOutputFormat {
       assertTrue(split.toString().contains("hasFooter=true"));
       assertTrue(split.toString().contains("hasBase=true"));
       assertTrue(split.toString().contains("deltas=0"));
+      assertTrue(split.toString().contains("isOriginal=true"));
       if (split instanceof OrcSplit) {
         assertTrue("Footer serialize test for ACID reader, hasFooter is expected in" +
             " orc splits.", ((OrcSplit) split).hasFooter());
@@ -3512,10 +3516,12 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: open to read data - split 1 => mock:/mocktable6/0_0
-    // call-2: open to read data - split 2 => mock:/mocktable6/0_1
-    // call-3: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_0
-    // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_1
-    assertEquals(4, readOpsDelta);
+    // call-2: AcidUtils.getAcidState - split 1 => ls mock:/mocktable6
+    // call-3: open to read data - split 2 => mock:/mocktable6/0_1
+    // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6
+    // call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file)
+    // call-6: file stat - split 2 => mock:/mocktable6/0_0
+    assertEquals(6, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3883,7 +3889,7 @@ public class TestInputOutputFormat {
     conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2");
     OrcSplit split = new OrcSplit(testFilePath, null, 0, fileLength,
         new String[0], null, false, true,
-        new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength);
+        new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength, workDir);
     OrcInputFormat inputFormat = new OrcInputFormat();
     AcidInputFormat.RowReader<OrcStruct> reader = inputFormat.getReader(split,
         new AcidInputFormat.Options(conf));
@@ -3911,7 +3917,7 @@ public class TestInputOutputFormat {
     conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2,3");
     split = new OrcSplit(testFilePath, null, 0, fileLength,
         new String[0], null, false, true,
-        new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength);
+        new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength, workDir);
     inputFormat = new OrcInputFormat();
     reader = inputFormat.getReader(split, new AcidInputFormat.Options(conf));
     record = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 82cf108..9628a40 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -826,12 +826,9 @@ public class TestOrcRawRecordMerger {
     merger.close();
 
     //now run as if it's a minor Compaction so we don't collapse events
-    //it's not exactly like minor compaction since MC would not have a baseReader
     //here there is only 1 "split" since we only have data for 1 bucket
-    baseReader = OrcFile.createReader(basePath,
-      OrcFile.readerOptions(conf));
     merger =
-      new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+      new OrcRawRecordMerger(conf, false, null, false, BUCKET,
         createMaximalTxnList(), new Reader.Options(),
         AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
     assertEquals(null, merger.getMinKey());
@@ -844,40 +841,86 @@ public class TestOrcRawRecordMerger {
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    //data from delta_200_200
+    assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
       OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id);
-    assertEquals("first", getValue(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
+    assertEquals("update 1", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
       OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
-    assertEquals("second", getValue(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+    assertEquals("update 3", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    //now run as if it's a major Compaction so we collapse events
+    //here there is only 1 "split" since we only have data for 1 bucket
+    baseReader = OrcFile.createReader(basePath,
+      OrcFile.readerOptions(conf));
+    merger =
+      new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+        createMaximalTxnList(), new Reader.Options(),
+        AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options()
+        .isCompacting(true).isMajorCompaction(true));
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
       OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
       OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id);
-    assertEquals("third", getValue(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
+    assertEquals("second", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
       OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
       OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id);
-    assertEquals("fourth", getValue(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
@@ -904,12 +947,6 @@ public class TestOrcRawRecordMerger {
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-      OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id);
-    assertEquals("eighth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
       OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
@@ -918,12 +955,6 @@ public class TestOrcRawRecordMerger {
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
       OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id);
-    assertEquals("ninth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
     assertEquals("tenth", getValue(event));
 
@@ -949,7 +980,6 @@ public class TestOrcRawRecordMerger {
     assertEquals(false, merger.next(id, event));
     merger.close();
 
-
     // try ignoring the 200 transaction and make sure it works still
     ValidTxnList txns = new ValidReadTxnList("2000:200:200");
     //again 1st split is for base/

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/queries/clientnegative/create_not_acid.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/create_not_acid.q b/ql/src/test/queries/clientnegative/create_not_acid.q
index 8d6c9ac..8ed13fb 100644
--- a/ql/src/test/queries/clientnegative/create_not_acid.q
+++ b/ql/src/test/queries/clientnegative/create_not_acid.q
@@ -2,5 +2,5 @@ set hive.support.concurrency=true;
 set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 
 
-create table acid_notbucketed(a int, b varchar(128)) stored as orc TBLPROPERTIES ('transactional'='true');
+create table acid_notbucketed(a int, b varchar(128)) TBLPROPERTIES ('transactional'='true');
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/queries/clientpositive/acid_no_buckets.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_no_buckets.q b/ql/src/test/queries/clientpositive/acid_no_buckets.q
new file mode 100644
index 0000000..c2f713e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_no_buckets.q
@@ -0,0 +1,210 @@
+--this has 4 groups of tests
+--Acid tables w/o bucketing
+--the tests with bucketing (make sure we get the same results)
+--same tests with and w/o vectorization
+
+set hive.mapred.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=false;
+set hive.explain.user=false;
+set hive.merge.cardinality.check=true;
+
+drop table if exists srcpart_acid;
+CREATE TABLE srcpart_acid (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acid PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 1 row for 43, 2 for 213, 1 for 44 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acid where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acid where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acid PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acid where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acid where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acid where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acid where key in( '1001', '213', '43');
+--make sure nothing extra was deleted  (2000 + 3 (insert) - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acid;
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acid t using (select distinct ds, hr, key, value from srcpart_acid) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acid where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acid where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acid;
+
+
+drop table if exists srcpart_acidb;
+CREATE TABLE srcpart_acidb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acidb PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 1 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acidb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acidb where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acidb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acidb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acidb where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acidb where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acidb where key in( '1001', '213', '43');
+--make sure nothing extra was deleted  (2000 + 3 (insert) - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acidb;
+
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acidb t using (select distinct ds, hr, key, value from srcpart_acidb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acidb where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acidb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acidb;
+
+
+
+--now same thing but vectorized
+set hive.vectorized.execution.enabled=true;
+
+drop table if exists srcpart_acidv;
+CREATE TABLE srcpart_acidv (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acidv PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 21 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acidv where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acidv where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acidv PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acidv where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acidv where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acidv where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acidv where key in( '1001', '213', '43');
+--make sure nothing extra was deleted  (2000 + 3 - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acidv;
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acidv t using (select distinct ds, hr, key, value from srcpart_acidv) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acidv where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acidv where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acidv;
+
+
+
+drop table if exists srcpart_acidvb;
+CREATE TABLE srcpart_acidvb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acidvb PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 1 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acidvb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acidvb where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acidvb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acidvb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acidvb where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acidvb where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acidvb where key in( '1001', '213', '43');
+--make sure nothing extra was deleted  (2000 + 3 (insert) - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acidvb;
+
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acidvb t using (select distinct ds, hr, key, value from srcpart_acidvb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acidvb where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acidvb where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acidvb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acidvb;

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/create_not_acid.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/create_not_acid.q.out b/ql/src/test/results/clientnegative/create_not_acid.q.out
index bb8f6c9..4e775e5 100644
--- a/ql/src/test/results/clientnegative/create_not_acid.q.out
+++ b/ql/src/test/results/clientnegative/create_not_acid.q.out
@@ -1,5 +1,5 @@
-PREHOOK: query: create table acid_notbucketed(a int, b varchar(128)) stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: query: create table acid_notbucketed(a int, b varchar(128)) TBLPROPERTIES ('transactional'='true')
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
 PREHOOK: Output: default@acid_notbucketed
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:The table must be bucketed and stored using an ACID compliant format (such as ORC))
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:The table must be stored using an ACID compliant format (such as ORC))

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/delete_non_acid_table.q.out b/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
index a9b884a..19fd5fb 100644
--- a/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
+++ b/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
@@ -34,4 +34,4 @@ POSTHOOK: Input: default@not_an_acid_table2
 -1070883071	0ruyd6Y50JpdGRf6HqD
 -1070551679	iUR3Q
 -1069736047	k17Am8uPHWk02cEf1jet
-FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table2 that does not use an AcidOutputFormat or is not bucketed
+FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table2 that is not transactional

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/update_non_acid_table.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/update_non_acid_table.q.out b/ql/src/test/results/clientnegative/update_non_acid_table.q.out
index 381b0db..02946fc 100644
--- a/ql/src/test/results/clientnegative/update_non_acid_table.q.out
+++ b/ql/src/test/results/clientnegative/update_non_acid_table.q.out
@@ -34,4 +34,4 @@ POSTHOOK: Input: default@not_an_acid_table
 -1070883071	0ruyd6Y50JpdGRf6HqD
 -1070551679	iUR3Q
 -1069736047	k17Am8uPHWk02cEf1jet
-FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table that does not use an AcidOutputFormat or is not bucketed
+FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table that is not transactional


[3/3] hive git commit: HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)

Posted by ek...@apache.org.
HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6be50b76
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6be50b76
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6be50b76

Branch: refs/heads/master
Commit: 6be50b76be5956b3c52ed6024fd7b4a3dee65fb6
Parents: 262d8f9
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 25 20:14:57 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 25 20:15:26 2017 -0700

----------------------------------------------------------------------
 .../streaming/AbstractRecordWriter.java         |   44 +-
 .../hive/hcatalog/streaming/HiveEndPoint.java   |    4 +-
 .../apache/hive/hcatalog/streaming/package.html |   21 +-
 .../hive/hcatalog/streaming/TestStreaming.java  |   86 +-
 .../hive/metastore/TestHiveMetaStore.java       |    8 +-
 .../apache/hadoop/hive/ql/TestAcidOnTez.java    |  367 +++-
 .../hive/ql/TestAcidOnTezWithSplitUpdate.java   |   28 -
 .../test/resources/testconfiguration.properties |    4 +-
 .../TransactionalValidationListener.java        |   42 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |    4 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   22 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   60 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |  225 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |   14 +-
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |   10 +-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java |   26 +-
 .../optimizer/SortedDynPartitionOptimizer.java  |    1 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |    6 -
 .../hive/ql/txn/compactor/CompactorMR.java      |    3 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  108 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   12 +-
 .../hadoop/hive/ql/TestTxnCommandsBase.java     |  162 ++
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java |  297 +++
 .../hive/ql/io/orc/TestInputOutputFormat.java   |   28 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   86 +-
 .../queries/clientnegative/create_not_acid.q    |    2 +-
 .../queries/clientpositive/acid_no_buckets.q    |  210 ++
 .../clientnegative/create_not_acid.q.out        |    4 +-
 .../clientnegative/delete_non_acid_table.q.out  |    2 +-
 .../clientnegative/update_non_acid_table.q.out  |    2 +-
 .../clientpositive/llap/acid_no_buckets.q.out   | 1976 ++++++++++++++++++
 31 files changed, 3534 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index e409e75..4ec10ad 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -54,19 +55,23 @@ import java.util.Properties;
 public abstract class AbstractRecordWriter implements RecordWriter {
   static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
 
-  final HiveConf conf;
-  final HiveEndPoint endPoint;
+  private final HiveConf conf;
+  private final HiveEndPoint endPoint;
   final Table tbl;
 
-  final IMetaStoreClient msClient;
-  protected final List<Integer> bucketIds;
-  ArrayList<RecordUpdater> updaters = null;
+  private final IMetaStoreClient msClient;
+  final List<Integer> bucketIds;
+  private ArrayList<RecordUpdater> updaters = null;
 
-  public final int totalBuckets;
+  private final int totalBuckets;
+  /**
+   * Indicates whether target table is bucketed
+   */
+  private final boolean isBucketed;
 
   private final Path partitionPath;
 
-  final AcidOutputFormat<?,?> outf;
+  private final AcidOutputFormat<?,?> outf;
   private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
   private Long curBatchMinTxnId;
   private Long curBatchMaxTxnId;
@@ -109,16 +114,22 @@ public abstract class AbstractRecordWriter implements RecordWriter {
         this.tbl = twp.tbl;
         this.partitionPath = twp.partitionPath;
       }
-      this.totalBuckets = tbl.getSd().getNumBuckets();
-      if (totalBuckets <= 0) {
-        throw new StreamingException("Cannot stream to table that has not been bucketed : "
-          + endPoint);
+      this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+      /**
+       *  For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+       *  ends up writing to a file bucket_000000
+       * See also {@link #getBucket(Object)}
+       */
+      this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+      if(isBucketed) {
+        this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+        this.bucketFieldData = new Object[bucketIds.size()];
+      }
+      else {
+        bucketIds = Collections.emptyList();
       }
-      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
-      this.bucketFieldData = new Object[bucketIds.size()];
       String outFormatName = this.tbl.getSd().getOutputFormat();
       outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
-      bucketFieldData = new Object[bucketIds.size()];
     } catch(InterruptedException e) {
       throw new StreamingException(endPoint2.toString(), e);
     } catch (MetaException | NoSuchObjectException e) {
@@ -169,6 +180,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   // returns the bucket number to which the record belongs to
   protected int getBucket(Object row) throws SerializationError {
+    if(!isBucketed) {
+      return 0;
+    }
     ObjectInspector[] inspectors = getBucketObjectInspectors();
     Object[] bucketFields = getBucketFields(row);
     return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
@@ -204,7 +218,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     curBatchMaxTxnId = maxTxnID;
     updaters = new ArrayList<RecordUpdater>(totalBuckets);
     for (int bucket = 0; bucket < totalBuckets; bucket++) {
-      updaters.add(bucket, null);
+      updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 81f6155..28c98bd 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -20,6 +20,8 @@ package org.apache.hive.hcatalog.streaming;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.cli.CliSessionState;
@@ -338,7 +340,7 @@ public class HiveEndPoint {
       // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
       Map<String, String> params = t.getParameters();
       if (params != null) {
-        String transactionalProp = params.get("transactional");
+        String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
         if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) {
           LOG.error("'transactional' property is not set on Table " + endPoint);
           throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" +

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
index ed4d307..a879b97 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
@@ -30,7 +30,7 @@ partition. Once data is committed it becomes immediately visible to
 all Hive queries initiated subsequently.</p>
 
 <p>
-This API is intended for streaming clients such as Flume and Storm,
+This API is intended for streaming clients such as NiFi, Flume and Storm,
 which continuously generate data. Streaming support is built on top of
 ACID based insert/update support in Hive.</p>
 
@@ -56,10 +56,7 @@ A few things are currently required to use streaming.
 <ol>
   <li> Currently, only ORC storage format is supported. So 
     '<b>stored as orc</b>' must be specified during table creation.</li>
-  <li> The hive table must be bucketed, but not sorted. So something like 
-    '<b>clustered by (<i>colName</i>) into <i>10</i> buckets</b>' must 
-    be specified during table creation. The number of buckets
-    is ideally the same as the number of streaming writers.</li>
+  <li> The hive table may be bucketed but must not be sorted. </li>
   <li> User of the client streaming process must have the necessary 
     permissions to write to the table or partition and create partitions in
     the table.</li>
@@ -67,7 +64,6 @@ A few things are currently required to use streaming.
     <ol>
       <li><b>hive.input.format =
              org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
-      <li><b>hive.vectorized.execution.enabled = false</b></li>
     </ol></li>
   The above client settings are a temporary requirement and the intention is to
   drop the need for them in the near future.
@@ -165,8 +161,21 @@ additional implementations of the <b>RecordWriter</b> interface.
 - Delimited text input.</li>
 <li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a> 
 - JSON text input.</li>
+  <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a>
+    - text input with regex.</li>
 </ul></p>
 
+<h2>Performance, Concurrency, Etc.</h2>
+<p>
+  Each StreamingConnection is writing data at the rate the underlying
+  FileSystem can accept it.  If that is not sufficient, multiple StreamingConnection objects can
+  be created concurrently.
+</p>
+<p>
+  Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+  may have at most 2 threads operaing on it.
+  See <a href="TransactionBatch.html"><b>TransactionBatch</b></a>
+</p>
 </body>
 
 </html>

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index f3ef92b..49520ef 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -74,6 +77,7 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -193,7 +197,6 @@ public class TestStreaming {
 
     conf = new HiveConf(this.getClass());
     conf.set("fs.raw.impl", RawFileSystem.class.getName());
-    conf.set("hive.enforce.bucketing", "true");
     conf
     .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
@@ -339,6 +342,83 @@ public class TestStreaming {
     }
   }
 
+  /**
+   * Test that streaming can write to unbucketed table.
+   */
+  @Test
+  public void testNoBuckets() throws Exception {
+    queryTable(driver, "drop table if exists default.streamingnobuckets");
+    //todo: why does it need transactional_properties?
+    queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
+    queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
+    List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("foo\tbar", rs.get(0));
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+    String[] colNames1 = new String[] { "a", "b" };
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a1,b2".getBytes());
+    txnBatch.write("a3,b4".getBytes());
+    txnBatch.commit();
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a5,b6".getBytes());
+    txnBatch.write("a7,b8".getBytes());
+    txnBatch.commit();
+    txnBatch.close();
+
+    Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+    Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+
+    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
+    queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+    rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
+    int row = 0;
+    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+
+    queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
+    runWorker(conf);
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+  }
+
+  /**
+   * this is a clone from TestTxnStatement2....
+   */
+  public static void runWorker(HiveConf hiveConf) throws MetaException {
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
 
   // stream data into streaming table with N buckets, then copy the data into another bucketed table
   // check if bucketing in both was done in the same way
@@ -453,8 +533,8 @@ public class TestStreaming {
   }
 
   /**
-   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
-   * little value in using InputFormat directly
+   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} -
+   * there is little value in using InputFormat directly
    */
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
index 8bd23cc..50e5274 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
@@ -3018,7 +3018,7 @@ public abstract class TestHiveMetaStore extends TestCase {
       Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
       Assert.assertTrue("Expected exception", false);
     } catch (MetaException e) {
-      Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+      Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
     }
 
     // Fail - "transactional" is set to true, and the table is bucketed, but doesn't use ORC
@@ -3031,7 +3031,7 @@ public abstract class TestHiveMetaStore extends TestCase {
       Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
       Assert.assertTrue("Expected exception", false);
     } catch (MetaException e) {
-      Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+      Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
     }
 
     // Succeed - "transactional" is set to true, and the table is bucketed, and uses ORC
@@ -3064,13 +3064,14 @@ public abstract class TestHiveMetaStore extends TestCase {
       tblName += "1";
       params.clear();
       sd.unsetBucketCols();
+      sd.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
       t = createTable(dbName, tblName, owner, params, null, sd, 0);
       params.put("transactional", "true");
       t.setParameters(params);
       client.alter_table(dbName, tblName, t);
       Assert.assertTrue("Expected exception", false);
     } catch (MetaException e) {
-      Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+      Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
     }
 
     // Succeed - trying to set "transactional" to "true", and satisfies bucketing and Input/OutputFormat requirement
@@ -3078,6 +3079,7 @@ public abstract class TestHiveMetaStore extends TestCase {
     params.clear();
     sd.setNumBuckets(1);
     sd.setBucketCols(bucketCols);
+    sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
     t = createTable(dbName, tblName, owner, params, null, sd, 0);
     params.put("transactional", "true");
     t.setParameters(params);

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 2bf9871..d0b5cf6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -22,7 +22,10 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -30,23 +33,26 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class resides in itests to facilitate running query using Tez engine, since the jars are
  * fully loaded here, which is not the case if it stays in ql.
  */
 public class TestAcidOnTez {
+  static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class);
   private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
       File.separator + TestAcidOnTez.class.getCanonicalName()
       + "-" + System.currentTimeMillis()
@@ -61,8 +67,10 @@ public class TestAcidOnTez {
   private static enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
+    ACIDNOBUCKET("acidNoBucket"),
     NONACIDORCTBL("nonAcidOrcTbl"),
-    NONACIDPART("nonAcidPart");
+    NONACIDPART("nonAcidPart"),
+    NONACIDNONBUCKET("nonAcidNonBucket");
 
     private final String name;
     @Override
@@ -159,6 +167,359 @@ public class TestAcidOnTez {
     testJoin("tez", "MapJoin");
   }
 
+  /**
+   * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+   * where "original" files are not immediate children of the partition dir
+   */
+  @Test
+  public void testNonStandardConversion01() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+    setupTez(confForTez);
+    //CTAS with non-ACID target table
+    runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC TBLPROPERTIES('transactional'='false') as " +
+      "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+
+    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME");
+    String expected0[][] = {
+      {"1\t2", "/1/000000_0"},
+      {"3\t4", "/1/000000_0"},
+      {"5\t6", "/1/000000_0"},
+      {"5\t6", "/2/000000_0"},
+      {"7\t8", "/2/000000_0"},
+      {"9\t10", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected0.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+    }
+    //make the table ACID
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')");
+
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after ctas:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    /*
+    * Expected result 0th entry i the RecordIdentifier + data.  1st entry file before compact*/
+    String expected[][] = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+    //perform some Update/Delete
+    runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b  = 80 where a = 7");
+    runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after update/delete:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected2 = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"}
+    };
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //now make sure delete deltas are present
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta.length; i++) {
+        if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+          expectedDelDelta[i] = null;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+    }
+    //run Minor compaction
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after compact minor:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify the data is the same
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+      //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //check we have right delete delta files after minor compaction
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta2 = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000", "delete_delta_0000021_0000022"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta2.length; i++) {
+        if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+          expectedDelDelta2[i] = null;
+          break;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta2.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+    }
+    //run Major compaction
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after compact major:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //everything is now in base/
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000022/bucket_00000"));
+    }
+  }
+  /**
+   * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+   * where "original" files are not immediate children of the partition dir - partitioned table
+   *
+   * How to do this?  CTAS is the only way to create data files which are not immediate children
+   * of the partition dir.  CTAS/Union/Tez doesn't support partition tables.  The only way is to copy
+   * data files in directly.
+   */
+  @Ignore("HIVE-17214")
+  @Test
+  public void testNonStandardConversion02() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+    confForTez.setBoolean("mapred.input.dir.recursive", true);
+    setupTez(confForTez);
+    runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC " +
+      "TBLPROPERTIES('transactional'='false') as " +
+      "select a, b from " + Table.ACIDTBL + " where a <= 3 union all " +
+      "select a, b from " + Table.NONACIDORCTBL + " where a >= 7 " +
+      "union all select a, b from " + Table.ACIDTBL + " where a = 5", confForTez);
+
+    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " +
+      Table.NONACIDNONBUCKET + " order by a, b");
+    String expected0[][] = {
+      {"1\t2", "/1/000000_0"},
+      {"3\t4", "/1/000000_0"},
+      {"5\t6", "/3/000000_0"},
+      {"7\t8", "/2/000000_0"},
+      {"9\t10", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected0.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+    }
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    //ensure there is partition dir
+    runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)");
+    //creates more files in that partition
+    for(FileStatus stat : status) {
+      int limit = 5;
+      Path p = stat.getPath();//dirs 1/, 2/, 3/
+      Path to = new Path(TEST_WAREHOUSE_DIR + "/" +  Table.NONACIDPART+ "/p=1/" + p.getName());
+      while(limit-- > 0 && !fs.rename(p, to)) {
+        Thread.sleep(200);
+      }
+      if(limit <= 0) {
+        throw new IllegalStateException("Could not rename " + p + " to " + to);
+      }
+    }
+    /*
+    This is what we expect on disk
+    ekoifman:warehouse ekoifman$ tree nonacidpart/
+    nonacidpart/
+    └── p=1
+    ├── 000000_0
+    ├── 1
+    │   └── 000000_0
+    ├── 2
+    │   └── 000000_0
+    └── 3
+        └── 000000_0
+
+4 directories, 4 files
+    **/
+    //make the table ACID
+    runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')");
+    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+    LOG.warn("after acid conversion:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"}
+    };
+    Assert.assertEquals("Wrong row count", expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+
+    //run Major compaction
+    runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+    LOG.warn("after major compaction:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " ac: " +
+        rs.get(i), rs.get(i).endsWith("nonacidpart/p=1/base_-9223372036854775808/bucket_00000"));
+    }
+
+  }
+  /**
+   * CTAS + Tez + Union creates a non-standard layout in table dir
+   * Each leg of the union places data into a subdir of the table/partition.  Subdirs are named 1/, 2/, etc
+   * The way this currently works is that CTAS creates an Acid table but the insert statement writes
+   * the data in non-acid layout.  Then on read, it's treated like an non-acid to acid conversion.
+   * Longer term CTAS should create acid layout from the get-go.
+   */
+  @Test
+  public void testCtasTezUnion() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+    setupTez(confForTez);
+    //CTAS with ACID target table
+    runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " +
+      "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after ctas:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    /*
+    * Expected result 0th entry i the RecordIdentifier + data.  1st entry file before compact*/
+    String expected[][] = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+    //perform some Update/Delete
+    runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b  = 80 where a = 7");
+    runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after update/delete:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected2 = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"}
+    };
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //now make sure delete deltas are present
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta.length; i++) {
+        if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+          expectedDelDelta[i] = null;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+    }
+    //run Minor compaction
+    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after compact minor:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify the data is the same
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+      //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //check we have right delete delta files after minor compaction
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta2.length; i++) {
+        if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+          expectedDelDelta2[i] = null;
+          break;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta2.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+    }
+    //run Major compaction
+    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after compact major:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //everything is now in base/
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000"));
+    }
+  }
   // Ideally test like this should be a qfile test. However, the explain output from qfile is always
   // slightly different depending on where the test is run, specifically due to file size estimation
   private void testJoin(String engine, String joinType) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
deleted file mode 100644
index 3dacf08..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql;
-
-/**
- * Same as parent class but covers Acid 2.0 tables
- */
-public class TestAcidOnTezWithSplitUpdate extends TestAcidOnTez {
-  @Override
-  String getTblProperties() {
-    return "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 37a3757..fa6a2aa 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -456,7 +456,9 @@ minillap.query.files=acid_bucket_pruning.q,\
   llap_stats.q,\
   multi_count_distinct_null.q
 
-minillaplocal.query.files=acid_globallimit.q,\
+minillaplocal.query.files=\
+  acid_no_buckets.q, \
+  acid_globallimit.q,\
   acid_vectorization_missing_cols.q,\
   alter_merge_stats_orc.q,\
   auto_join30.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 023d703..3a3d184 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -98,14 +98,26 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
         // that will use it down below.
       }
     }
+    Table oldTable = context.getOldTable();
+    String oldTransactionalValue = null;
+    String oldTransactionalPropertiesValue = null;
+    for (String key : oldTable.getParameters().keySet()) {
+      if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
+        oldTransactionalValue = oldTable.getParameters().get(key);
+      }
+      if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+        oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
+      }
+    }
+
     if (transactionalValuePresent) {
       //normalize prop name
       parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
     }
-    if ("true".equalsIgnoreCase(transactionalValue)) {
+    if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
+      //only need to check conformance if alter table enabled aicd
       if (!conformToAcid(newTable)) {
-        throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
-            " format (such as ORC)");
+        throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
       }
 
       if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -115,17 +127,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
       hasValidTransactionalValue = true;
     }
 
-    Table oldTable = context.getOldTable();
-    String oldTransactionalValue = null;
-    String oldTransactionalPropertiesValue = null;
-    for (String key : oldTable.getParameters().keySet()) {
-      if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
-        oldTransactionalValue = oldTable.getParameters().get(key);
-      }
-      if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
-        oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
-      }
-    }
 
 
     if (oldTransactionalValue == null ? transactionalValue == null
@@ -195,8 +196,7 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
 
     if ("true".equalsIgnoreCase(transactionalValue)) {
       if (!conformToAcid(newTable)) {
-        throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
-            " format (such as ORC)");
+        throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
       }
 
       if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -214,14 +214,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
     throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true'");
   }
 
-  // Check if table is bucketed and InputFormatClass/OutputFormatClass should implement
-  // AcidInputFormat/AcidOutputFormat
+  /**
+   * Check that InputFormatClass/OutputFormatClass should implement
+   * AcidInputFormat/AcidOutputFormat
+   */
   private boolean conformToAcid(Table table) throws MetaException {
     StorageDescriptor sd = table.getSd();
-    if (sd.getBucketColsSize() < 1) {
-      return false;
-    }
-
     try {
       Class inputFormatClass = Class.forName(sd.getInputFormat());
       Class outputFormatClass = Class.forName(sd.getOutputFormat());

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9c9d4e7..b3ef916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -418,8 +418,8 @@ public enum ErrorMsg {
       " does not support these operations."),
   VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
       "Values clause with table constructor not yet supported"),
-  ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " +
-      "an AcidOutputFormat or is not bucketed", true),
+  ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that is " +
+    "not transactional", true),
   ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " +
       "sorted, table {0}", true),
   ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED(10299,

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 25ad1e9..bc265eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
@@ -285,6 +287,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   private transient int numFiles;
   protected transient boolean multiFileSpray;
   protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+  private transient boolean isBucketed = false;
 
   private transient ObjectInspector[] partitionObjectInspectors;
   protected transient HivePartitioner<HiveKey, Object> prtner;
@@ -345,6 +348,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       isNativeTable = !conf.getTableInfo().isNonNative();
       isTemporary = conf.isTemporary();
       multiFileSpray = conf.isMultiFileSpray();
+      this.isBucketed = hconf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
       totalFiles = conf.getTotalFiles();
       numFiles = conf.getNumFiles();
       dpCtx = conf.getDynPartCtx();
@@ -791,9 +795,23 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location.  The
           * reset of acid (read path) doesn't know how to handle copy_N files except for 'original'
           * files (HIVE-16177)*/
+          int writerId = -1;
+          if(!isBucketed) {
+            assert !multiFileSpray;
+            assert writerOffset == 0;
+            /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can
+            * be written to the same bucketN file.
+            * N in this case is writerId and there is no relationship
+            * between the file name and any property of the data in it.  Inserts will be written
+            * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed
+            * contain writerId=N.
+            * Since taskId is unique (at least per statementId and thus
+            * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/
+            writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
+          }
           fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
-            jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset],
-            rowInspector, reporter, 0);
+            jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf,
+            fpaths.outPaths[writerOffset], rowInspector, reporter, 0);
           if (LOG.isDebugEnabled()) {
             LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
               fpaths.outPaths[writerOffset]);

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 751fca8..69a9f9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -964,6 +964,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final boolean allowSyntheticFileIds;
     private final boolean isDefaultFs;
 
+    /**
+     * @param dir - root of partition dir
+     */
     public BISplitStrategy(Context context, FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, List<DeltaMetaData> deltas,
         boolean[] covered, boolean allowSyntheticFileIds, boolean isDefaultFs) {
@@ -996,7 +999,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             }
             OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
                 entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
-                deltas, -1, logicalLen);
+                deltas, -1, logicalLen, dir);
             splits.add(orcSplit);
           }
         }
@@ -1017,18 +1020,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * ACID split strategy is used when there is no base directory (when transactions are enabled).
    */
   static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
-    private Path dir;
+    Path dir;
     private List<DeltaMetaData> deltas;
-    private boolean[] covered;
-    private int numBuckets;
     private AcidOperationalProperties acidOperationalProperties;
-
+    /**
+     * @param dir root of partition dir
+     */
     ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
         AcidOperationalProperties acidOperationalProperties) {
       this.dir = dir;
-      this.numBuckets = numBuckets;
       this.deltas = deltas;
-      this.covered = covered;
       this.acidOperationalProperties = acidOperationalProperties;
     }
 
@@ -1234,6 +1235,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final UserGroupInformation ugi;
     private final boolean allowSyntheticFileIds;
     private SchemaEvolution evolution;
+    //this is the root of the partition in which the 'file' is located
+    private final Path rootDir;
 
     public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
         boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException {
@@ -1250,6 +1253,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.isOriginal = splitInfo.isOriginal;
       this.deltas = splitInfo.deltas;
       this.hasBase = splitInfo.hasBase;
+      this.rootDir = splitInfo.dir;
       this.projColsUncompressedSize = -1;
       this.deltaSplits = splitInfo.getSplits();
       this.allowSyntheticFileIds = allowSyntheticFileIds;
@@ -1361,7 +1365,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         fileKey = new SyntheticFileId(file);
       }
       return new OrcSplit(file.getPath(), fileKey, offset, length, hosts,
-          orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen);
+          orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen, rootDir);
     }
 
     private static final class OffsetAndLength { // Java cruft; pair of long.
@@ -1641,7 +1645,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       pathFutures.add(ecs.submit(fileGenerator));
     }
 
-    boolean isTransactionalTableScan =
+    boolean isTransactionalTableScan =//this never seems to be set correctly
         HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
     boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
     TypeDescription readerSchema =
@@ -1932,16 +1936,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     final OrcSplit split = (OrcSplit) inputSplit;
     final Path path = split.getPath();
-
     Path root;
     if (split.hasBase()) {
       if (split.isOriginal()) {
-        root = path.getParent();
+        root = split.getRootDir();
       } else {
         root = path.getParent().getParent();
+        assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() +
+          " path.p.p=" + root;
       }
-    } else {//here path is a delta/ but above it's a partition/
-      root = path;
+    } else {
+      throw new IllegalStateException("Split w/o base: " + path);
     }
 
     // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
@@ -2037,21 +2042,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     };
   }
-  private static Path findOriginalBucket(FileSystem fs,
-                                 Path directory,
-                                 int bucket) throws IOException {
-    for(FileStatus stat: fs.listStatus(directory)) {
-      if(stat.getLen() <= 0) {
-        continue;
-      }
-      AcidOutputFormat.Options bucketInfo =
-        AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
-      if(bucketInfo.getBucketId() == bucket) {
-        return stat.getPath();
-      }
-    }
-    throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory);
-  }
 
   static Reader.Options createOptionsForReader(Configuration conf) {
     /**
@@ -2275,20 +2265,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                            ) throws IOException {
     Reader reader = null;
     boolean isOriginal = false;
-    if (baseDirectory != null) {
-      Path bucketFile;
+    if (baseDirectory != null) {//this is NULL for minor compaction
+      Path bucketFile = null;
       if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
         bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
       } else {
+        /**we don't know which file to start reading -
+         * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
         isOriginal = true;
-        bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf),
-            baseDirectory, bucket);
       }
-      reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+      if(bucketFile != null) {
+        reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+      }
     }
     OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
       .isCompacting(true)
-      .rootPath(baseDirectory);
+      .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
     return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
         bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 97c4e3d..cbbb4c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
@@ -29,7 +31,6 @@ import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.AcidStats;
 import org.apache.orc.impl.OrcAcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +69,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   private final RecordIdentifier maxKey;
   // an extra value so that we can return it while reading ahead
   private OrcStruct extraValue;
-
   /**
    * A RecordIdentifier extended with the current transaction id. This is the
    * key of our merge sort with the originalTransaction, bucket, and rowId
@@ -294,9 +294,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * Running multiple Insert statements on the same partition (of non acid table) creates files
    * like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc.  So the OriginalReaderPair must treat all
    * of these files as part of a single logical bucket file.
+   *
+   * Also, for unbucketed (non acid) tables, there are no guarantees where data files may be placed.
+   * For example, CTAS+Tez+Union creates subdirs 1/, 2/, etc for each leg of the Union.  Thus the
+   * data file need not be an immediate child of partition dir.  All files for a given writerId are
+   * treated as one logical unit to assign {@link RecordIdentifier}s to them consistently.
    * 
    * For Compaction, where each split includes the whole bucket, this means reading over all the
    * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.
+   * For unbucketed tables, a Compaction split is all files written by a given writerId.
    *
    * For a read after the table is marked transactional but before it's rewritten into a base/
    * by compaction, each of the original files may be split into many pieces.  For each split we
@@ -305,7 +311,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * split of the original file and used to filter rows from all the deltas.  The ROW__ID.rowid for
    * the rows of the 'original' file of course, must be assigned from the beginning of logical
    * bucket.  The last split of the logical bucket, i.e. the split that has the end of last file,
-   * should include all insert events from deltas.
+   * should include all insert events from deltas (last sentence is obsolete for Acid 2: HIVE-17320)
    */
   private static abstract class OriginalReaderPair implements ReaderPair {
     OrcStruct nextRecord;
@@ -407,18 +413,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       RecordIdentifier newMaxKey = maxKey;
       recordReader = reader.rowsOptions(options);
       /**
-       * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc  We don't
-       * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+       * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copy_N. etc  We don't
+       * know N a priori so if this is true, then the current split is from 0000_0_copy_N file.
        * It's needed to correctly set maxKey.  In particular, set maxKey==null if this split
        * is the tail of the last file for this logical bucket to include all deltas written after
-       * non-acid to acid table conversion.
+       * non-acid to acid table conversion (todo: HIVE-17320).
+       * Also, see comments at {@link OriginalReaderPair} about unbucketed tables.
        */
-      boolean isLastFileForThisBucket = false;
+      boolean isLastFileForThisBucket = true;
       boolean haveSeenCurrentFile = false;
       long rowIdOffsetTmp = 0;
-      if (mergerOptions.getCopyIndex() > 0) {
+      {
         //the split is from something other than the 1st file of the logical bucket - compute offset
-
         AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
           conf, validTxnList, false, true);
         for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
@@ -467,23 +473,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
           }
         }
-      } else {
-        rowIdOffset = 0;
-        isLastFileForThisBucket = true;
-        AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
-          conf, validTxnList, false, true);
-        int numFilesInBucket = 0;
-        for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
-          AcidOutputFormat.Options bucketOptions =
-            AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-          if (bucketOptions.getBucketId() == bucketId) {
-            numFilesInBucket++;
-            if (numFilesInBucket > 1) {
-              isLastFileForThisBucket = false;
-              break;
-            }
-          }
-        }
       }
       if (!isLastFileForThisBucket && maxKey == null) {
           /*
@@ -651,6 +640,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   }
   /**
    * Find the key range for original bucket files.
+   * For unbucketed tables the insert event data is still written to bucket_N file except that
+   * N is just a writer ID - it still matches {@link RecordIdentifier#getBucketProperty()}.  For
+   * 'original' files (ubucketed) the same applies.  A file 000000_0 encodes a taskId/wirterId and
+   * at read time we synthesize {@link RecordIdentifier#getBucketProperty()} to match the file name
+   * and so the same bucketProperty is used here to create minKey/maxKey, i.e. these keys are valid
+   * to filter data from delete_delta files even for unbucketed tables.
    * @param reader the reader
    * @param bucket the bucket number we are reading
    * @param options the options for reading with
@@ -740,7 +735,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    */
   static Reader.Options createEventOptions(Reader.Options options) {
     Reader.Options result = options.clone();
-    //result.range(options.getOffset(), Long.MAX_VALUE);WTF?
     result.include(options.getInclude());
 
     // slide the column names down by 6 for the name array
@@ -755,11 +749,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     return result;
   }
 
+  /**
+   * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
+   * This makes the "context" explicit.
+   */
   static class Options {
     private int copyIndex = 0;
     private boolean isCompacting = false;
     private Path bucketPath;
     private Path rootPath;
+    private boolean isMajorCompaction = false;
+    private boolean isDeleteReader = false;
     Options copyIndex(int copyIndex) {
       assert copyIndex >= 0;
       this.copyIndex = copyIndex;
@@ -767,6 +767,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
     Options isCompacting(boolean isCompacting) {
       this.isCompacting = isCompacting;
+      assert !isDeleteReader;
       return this;
     }
     Options bucketPath(Path bucketPath) {
@@ -777,6 +778,16 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       this.rootPath = rootPath;
       return this;
     }
+    Options isMajorCompaction(boolean isMajor) {
+      this.isMajorCompaction = isMajor;
+      assert !isDeleteReader;
+      return this;
+    }
+    Options isDeleteReader(boolean isDeleteReader) {
+      this.isDeleteReader = isDeleteReader;
+      assert !isCompacting;
+      return this;
+    }
     /**
      * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
      */
@@ -788,7 +799,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
     /**
      * Full path to the data file
-     * @return
      */
     Path getBucketPath() {
       return bucketPath;
@@ -797,6 +807,22 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      * Partition folder (Table folder if not partitioned)
      */
     Path getRootPath()  { return rootPath; }
+    /**
+     * @return true if major compaction, false if minor
+     */
+    boolean isMajorCompaction() {
+      return isMajorCompaction && isCompacting;
+    }
+    boolean isMinorCompaction() {
+      return !isMajorCompaction && isCompacting;
+    }
+    /**
+     * true if this is only processing delete deltas to load in-memory table for
+     * vectorized reader
+     */
+    boolean isDeleteReader() {
+      return isDeleteReader;
+    }
   }
   /**
    * Create a reader that merge sorts the ACID events together.
@@ -820,6 +846,39 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.offset = options.getOffset();
     this.length = options.getLength();
     this.validTxnList = validTxnList;
+    /**
+     * @since Hive 3.0
+     * With split update (HIVE-14035) we have base/, delta/ and delete_delta/ - the latter only
+     * has Delete events and the others only have Insert events.  Thus {@link #baseReader} is
+     * a split of a file in base/ or delta/.
+     *
+     * For Compaction, each split (for now) is a logical bucket, i.e. all files from base/ + delta(s)/
+     * for a given bucket ID and delete_delta(s)/
+     *
+     * For bucketed tables, the data files are named bucket_N and all rows in this file are such
+     * that {@link org.apache.hadoop.hive.ql.io.BucketCodec#decodeWriterId(int)} of
+     * {@link RecordIdentifier#getBucketProperty()} is N.  This is currently true for all types of
+     * files but may not be true for for delete_delta/ files in the future.
+     *
+     * For un-bucketed tables, the system is designed so that it works when there is no relationship
+     * between delete_delta file name (bucket_N) and the value of {@link RecordIdentifier#getBucketProperty()}.
+     * (Later we this maybe optimized to take advantage of situations where it is known that
+     * bucket_N matches bucketProperty().)  This implies that for a given {@link baseReader} all
+     * files in delete_delta/ have to be opened ({@link ReaderPair} created).  Insert events are
+     * still written such that N in file name (writerId) matches what's in bucketProperty().
+     *
+     * Compactor for un-bucketed tables works exactly the same as for bucketed ones though it
+     * should be optimized (see HIVE-17206).  In particular, each split is a set of files
+     * created by a writer with the same writerId, i.e. all bucket_N files across base/ &
+     * deleta/ for the same N. Unlike bucketed tables, there is no relationship between
+     * any values in user columns to file name.
+     * The maximum N is determined by the number of writers the system chose for the the "largest"
+     * write into a given partition.
+     *
+     * In both cases, Compactor should be changed so that Minor compaction is run very often and
+     * only compacts delete_delta/.  Major compaction can do what it does now.
+     */
+    boolean isBucketed = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
 
     TypeDescription typeDescr =
         OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
@@ -829,16 +888,26 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     // modify the options to reflect the event instead of the base row
     Reader.Options eventOptions = createEventOptions(options);
-    if (reader == null) {
+    if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
+      mergerOptions.isDeleteReader()) {
+      //for minor compaction, there is no progress report and we don't filter deltas
       baseReader = null;
       minKey = maxKey = null;
+      assert reader == null : "unexpected input reader during minor compaction: " +
+        mergerOptions.getRootPath();
     } else {
       KeyInterval keyInterval;
-      // find the min/max based on the offset and length (and more for 'original')
-      if (isOriginal) {
-        keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+      if (mergerOptions.isCompacting()) {
+        assert mergerOptions.isMajorCompaction();
+        //compaction doesn't filter deltas but *may* have a reader for 'base'
+        keyInterval = new KeyInterval(null, null);
       } else {
-        keyInterval = discoverKeyBounds(reader, options);
+        // find the min/max based on the offset and length (and more for 'original')
+        if (isOriginal) {
+          keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+        } else {
+          keyInterval = discoverKeyBounds(reader, options);
+        }
       }
       LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
       // use the min/max instead of the byte range
@@ -849,8 +918,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         if(mergerOptions.isCompacting()) {
           pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
             conf, validTxnList);
-        }
-        else {
+        } else {
           pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
             keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
         }
@@ -868,35 +936,31 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       baseReader = pair.getRecordReader();
     }
 
-    // we always want to read all of the deltas
-    eventOptions.range(0, Long.MAX_VALUE);
     if (deltaDirectory != null) {
+      /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+      * user columns
+      * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+      Reader.Options deltaEventOptions = eventOptions.clone()
+        .searchArgument(null, null).range(0, Long.MAX_VALUE);
       for(Path delta: deltaDirectory) {
         if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
           //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
           throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
         }
         ReaderKey key = new ReaderKey();
-        Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
         AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
-        FileSystem fs = deltaFile.getFileSystem(conf);
-        long length = OrcAcidUtils.getLastFlushLength(fs, deltaFile);
-        if (length != -1 && fs.exists(deltaFile)) {
-          Reader deltaReader = OrcFile.createReader(deltaFile,
-              OrcFile.readerOptions(conf).maxLength(length));
-          Reader.Options deltaEventOptions = null;
-          if(eventOptions.getSearchArgument() != null) {
-            // Turn off the sarg before pushing it to delta.  We never want to push a sarg to a delta as
-            // it can produce wrong results (if the latest valid version of the record is filtered out by
-            // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
-            // unless the delta only has insert events
-            AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader);
-            if(acidStats.deletes > 0 || acidStats.updates > 0) {
-              deltaEventOptions = eventOptions.clone().searchArgument(null, null);
-            }
+        for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
+          FileSystem fs = deltaFile.getFileSystem(conf);
+          if(!fs.exists(deltaFile)) {
+            continue;
           }
+          /* side files are only created by streaming ingest.  If this is a compaction, we may
+          * have an insert delta/ here with side files there because the original writer died.*/
+          long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
+          assert length >= 0;
+          Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
           ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
-            deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+            deltaEventOptions, deltaDir.getStatementId());
           if (deltaPair.nextRecord() != null) {
             readers.put(key, deltaPair);
           }
@@ -921,6 +985,59 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
   }
 
+  /**
+   * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
+   * For unbucketed tables {@code bucket} can be thought of as a write tranche.
+   */
+  static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf,
+                              Options mergerOptions, boolean isBucketed) throws IOException {
+    if(isBucketed) {
+      /**
+       * for bucketed tables (for now) we always trust that the N in bucketN file name means that
+       * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N.  This
+       * means that a delete event in bucketN can only modify an insert in another bucketN file for
+       * the same N. (Down the road we may trust it only in certain delta dirs)
+       *
+       * Compactor takes all types of deltas for a given bucket.  For regular read, any file that
+       * contains (only) insert events is treated as base and only
+       * delete_delta/ are treated as deltas.
+       */
+        assert (!mergerOptions.isCompacting &&
+          deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+        ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory;
+      Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+      return new Path[]{deltaFile};
+    }
+    /**
+     * For unbucketed tables insert events are also stored in bucketN files but here N is
+     * the writer ID.  We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()}
+     * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN.
+     * Thus we always have to take all files in a delete_delta.
+     * For regular read, any file that has (only) insert events is treated as base so
+     * {@link deltaDirectory} can only be delete_delta and so we take all files in it.
+     * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for
+     * a given N.
+     */
+    if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+      //it's not wrong to take all delete events for bucketed tables but it's more efficient
+      //to only take those that belong to the 'bucket' assuming we trust the file name
+      //un-bucketed table - get all files
+      FileSystem fs = deltaDirectory.getFileSystem(conf);
+      FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter);
+      Path[] deltaFiles = new Path[dataFiles.length];
+      int i = 0;
+      for (FileStatus stat : dataFiles) {
+        deltaFiles[i++] = stat.getPath();
+      }//todo: need a test where we actually have more than 1 file
+      return deltaFiles;
+    }
+    //if here it must be delta_x_y - insert events only, so we must be compacting
+    assert mergerOptions.isCompacting() : "Expected to be called as part of compaction";
+    Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+    return new Path[] {deltaFile};
+
+  }
+  
   @VisibleForTesting
   RecordIdentifier getMinKey() {
     return minKey;

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 429960b..1e19a91 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -243,7 +243,8 @@ public class OrcRecordUpdater implements RecordUpdater {
     }
     if (options.getMinimumTransactionId() != options.getMaximumTransactionId()
         && !options.isWritingBase()){
-      flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
+      //throw if file already exists as that should never happen
+      flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8,
           options.getReporter());
       flushLengths.writeLong(0);
       OrcInputFormat.SHIMS.hflush(flushLengths);
@@ -349,6 +350,12 @@ public class OrcRecordUpdater implements RecordUpdater {
       return newInspector;
     }
   }
+
+  /**
+   * The INSERT event always uses {@link #bucket} that this {@link RecordUpdater} was created with
+   * thus even for unbucketed tables, the N in bucket_N file name matches writerId/bucketId even for
+   * late split
+   */
   private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
       throws IOException {
     this.operation.set(operation);
@@ -394,6 +401,11 @@ public class OrcRecordUpdater implements RecordUpdater {
     Integer currentBucket = null;
 
     if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+      /**
+       * make sure bucketProperty in the delete event is from the {@link row} rather than whatever
+       * {@link this#bucket} is.  For bucketed tables, the 2 must agree on bucketId encoded in it
+       * not for necessarily the whole value.  For unbucketed tables there is no relationship.
+       */
       currentBucket = setBucket(bucketInspector.get(
         recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
       // Initialize a deleteEventWriter if not yet done. (Lazy initialization)