You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/26 03:20:37 UTC
[1/3] hive git commit: HIVE-17205 - add functional support for
unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)
Repository: hive
Updated Branches:
refs/heads/master 262d8f992 -> 6be50b76b
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
new file mode 100644
index 0000000..34dd487
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
@@ -0,0 +1,1976 @@
+PREHOOK: query: drop table if exists srcpart_acid
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acid
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acid (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acid
+POSTHOOK: query: CREATE TABLE srcpart_acid (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acid
+PREHOOK: query: insert into srcpart_acid PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid
+POSTHOOK: query: insert into srcpart_acid PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08 11 43 val_43
+2008-04-08 11 413 val_413
+2008-04-08 11 413 val_413
+2008-04-09 11 43 val_43
+2008-04-09 11 413 val_413
+2008-04-09 11 413 val_413
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_acid
+ Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+ Filter Operator
+ predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+ Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+ Select Operator
+ expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ Reduce Output Operator
+ key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+ sort order: +
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
+ Execution mode: llap
+ LLAP IO: may be used (ACID table)
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), '11' (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acid
+ Write Type: UPDATE
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds
+ hr
+ replace: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acid
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+PREHOOK: query: update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: query: update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 43 val_43updated
+2008-04-08 11 413 val_413updated
+2008-04-08 11 413 val_413updated
+2008-04-09 11 43 val_43updated
+2008-04-09 11 413 val_413updated
+2008-04-09 11 413 val_413updated
+PREHOOK: query: insert into srcpart_acid PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+PREHOOK: type: QUERY
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: query: insert into srcpart_acid PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: srcpart_acid PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 1001 val1001
+2008-04-08 11 1002 val1002
+2008-04-08 11 1003 val1003
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain delete from srcpart_acid where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain delete from srcpart_acid where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_acid
+ Statistics: Num rows: 2015 Data size: 916825 Basic stats: COMPLETE Column stats: PARTIAL
+ Filter Operator
+ predicate: (key) IN ('1001', '213', '43') (type: boolean)
+ Statistics: Num rows: 20 Data size: 9100 Basic stats: COMPLETE Column stats: PARTIAL
+ Select Operator
+ expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ Reduce Output Operator
+ key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+ sort order: +
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ value expressions: _col1 (type: string), _col2 (type: string)
+ Execution mode: llap
+ LLAP IO: may be used (ACID table)
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acid
+ Write Type: DELETE
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds
+ hr
+ replace: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acid
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+PREHOOK: query: delete from srcpart_acid where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: delete from srcpart_acid where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: query: select count(*) from srcpart_acid where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select count(*) from srcpart_acid
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+1990
+PREHOOK: query: merge into srcpart_acid t using (select distinct ds, hr, key, value from srcpart_acid) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@merge_tmp_table
+PREHOOK: Output: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: query: merge into srcpart_acid t using (select distinct ds, hr, key, value from srcpart_acid) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acid@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acid)t.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acid)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acid)t.FieldSchema(name:hr, type:string, comment:null), ]
+PREHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated by merge'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acid where value like '%updated by merge'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acid@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 44 val_44updated by merge
+PREHOOK: query: select count(*) from srcpart_acid where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acid
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acid where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acid
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table if exists srcpart_acid
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_acid
+PREHOOK: Output: default@srcpart_acid
+POSTHOOK: query: drop table if exists srcpart_acid
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_acid
+POSTHOOK: Output: default@srcpart_acid
+PREHOOK: query: drop table if exists srcpart_acidb
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acidb
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acidb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acidb
+POSTHOOK: query: CREATE TABLE srcpart_acidb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acidb
+PREHOOK: query: insert into srcpart_acidb PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb
+POSTHOOK: query: insert into srcpart_acidb PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08 11 43 val_43
+2008-04-08 11 413 val_413
+2008-04-08 11 413 val_413
+2008-04-09 11 43 val_43
+2008-04-09 11 413 val_413
+2008-04-09 11 413 val_413
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_acidb
+ Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+ Filter Operator
+ predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+ Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+ Select Operator
+ expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ Reduce Output Operator
+ key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+ sort order: +
+ Map-reduce partition columns: UDFToInteger(_col0) (type: int)
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
+ Execution mode: llap
+ LLAP IO: may be used (ACID table)
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), '11' (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidb
+ Write Type: UPDATE
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds
+ hr
+ replace: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidb
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+PREHOOK: query: update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: query: update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 43 val_43updated
+2008-04-08 11 413 val_413updated
+2008-04-08 11 413 val_413updated
+2008-04-09 11 43 val_43updated
+2008-04-09 11 413 val_413updated
+2008-04-09 11 413 val_413updated
+PREHOOK: query: insert into srcpart_acidb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+PREHOOK: type: QUERY
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: query: insert into srcpart_acidb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: srcpart_acidb PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 1001 val1001
+2008-04-08 11 1002 val1002
+2008-04-08 11 1003 val1003
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain delete from srcpart_acidb where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain delete from srcpart_acidb where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_acidb
+ Statistics: Num rows: 2015 Data size: 916825 Basic stats: COMPLETE Column stats: PARTIAL
+ Filter Operator
+ predicate: (key) IN ('1001', '213', '43') (type: boolean)
+ Statistics: Num rows: 20 Data size: 9100 Basic stats: COMPLETE Column stats: PARTIAL
+ Select Operator
+ expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ Reduce Output Operator
+ key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+ sort order: +
+ Map-reduce partition columns: UDFToInteger(_col0) (type: int)
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ value expressions: _col1 (type: string), _col2 (type: string)
+ Execution mode: llap
+ LLAP IO: may be used (ACID table)
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidb
+ Write Type: DELETE
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds
+ hr
+ replace: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidb
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+PREHOOK: query: delete from srcpart_acidb where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: delete from srcpart_acidb where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: query: select count(*) from srcpart_acidb where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select count(*) from srcpart_acidb
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+1990
+PREHOOK: query: merge into srcpart_acidb t using (select distinct ds, hr, key, value from srcpart_acidb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@merge_tmp_table
+PREHOOK: Output: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: query: merge into srcpart_acidb t using (select distinct ds, hr, key, value from srcpart_acidb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidb@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidb)t.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidb)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidb)t.FieldSchema(name:hr, type:string, comment:null), ]
+PREHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated by merge'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidb where value like '%updated by merge'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 44 val_44updated by merge
+PREHOOK: query: select count(*) from srcpart_acidb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidb
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidb
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table if exists srcpart_acidb
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_acidb
+PREHOOK: Output: default@srcpart_acidb
+POSTHOOK: query: drop table if exists srcpart_acidb
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_acidb
+POSTHOOK: Output: default@srcpart_acidb
+PREHOOK: query: drop table if exists srcpart_acidv
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acidv
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acidv (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acidv
+POSTHOOK: query: CREATE TABLE srcpart_acidv (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acidv
+PREHOOK: query: insert into srcpart_acidv PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv
+POSTHOOK: query: insert into srcpart_acidv PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08 11 43 val_43
+2008-04-08 11 413 val_413
+2008-04-08 11 413 val_413
+2008-04-09 11 43 val_43
+2008-04-09 11 413 val_413
+2008-04-09 11 413 val_413
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_acidv
+ Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+ Filter Operator
+ predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+ Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+ Select Operator
+ expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ Reduce Output Operator
+ key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+ sort order: +
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
+ Execution mode: llap
+ LLAP IO: may be used (ACID table)
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), '11' (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidv
+ Write Type: UPDATE
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds
+ hr
+ replace: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidv
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+PREHOOK: query: update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: query: update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 43 val_43updated
+2008-04-08 11 413 val_413updated
+2008-04-08 11 413 val_413updated
+2008-04-09 11 43 val_43updated
+2008-04-09 11 413 val_413updated
+2008-04-09 11 413 val_413updated
+PREHOOK: query: insert into srcpart_acidv PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+PREHOOK: type: QUERY
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: query: insert into srcpart_acidv PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003')
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: srcpart_acidv PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where cast(key as integer) > 1000 order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 1001 val1001
+2008-04-08 11 1002 val1002
+2008-04-08 11 1003 val1003
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain delete from srcpart_acidv where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain delete from srcpart_acidv where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_acidv
+ Statistics: Num rows: 2015 Data size: 916825 Basic stats: COMPLETE Column stats: PARTIAL
+ Filter Operator
+ predicate: (key) IN ('1001', '213', '43') (type: boolean)
+ Statistics: Num rows: 20 Data size: 9100 Basic stats: COMPLETE Column stats: PARTIAL
+ Select Operator
+ expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ Reduce Output Operator
+ key expressions: _col0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>)
+ sort order: +
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ value expressions: _col1 (type: string), _col2 (type: string)
+ Execution mode: llap
+ LLAP IO: may be used (ACID table)
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col0 (type: string), VALUE._col1 (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 20 Data size: 8880 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidv
+ Write Type: DELETE
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds
+ hr
+ replace: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.srcpart_acidv
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+PREHOOK: query: delete from srcpart_acidv where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: delete from srcpart_acidv where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: query: select count(*) from srcpart_acidv where key in( '1001', '213', '43')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv where key in( '1001', '213', '43')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select count(*) from srcpart_acidv
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+1990
+PREHOOK: query: merge into srcpart_acidv t using (select distinct ds, hr, key, value from srcpart_acidv) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@merge_tmp_table
+PREHOOK: Output: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: query: merge into srcpart_acidv t using (select distinct ds, hr, key, value from srcpart_acidv) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidv@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(srcpart_acidv)t.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), (srcpart_acidv)t.FieldSchema(name:ds, type:string, comment:null), (srcpart_acidv)t.FieldSchema(name:hr, type:string, comment:null), ]
+PREHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+#### A masked pattern was here ####
+0
+PREHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated by merge'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidv where value like '%updated by merge'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidv@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+2008-04-08 11 44 val_44updated by merge
+PREHOOK: query: select count(*) from srcpart_acidv where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidv
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from srcpart_acidv where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidv
+#### A masked pattern was here ####
+0
+PREHOOK: query: drop table if exists srcpart_acidv
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_acidv
+PREHOOK: Output: default@srcpart_acidv
+POSTHOOK: query: drop table if exists srcpart_acidv
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_acidv
+POSTHOOK: Output: default@srcpart_acidv
+PREHOOK: query: drop table if exists srcpart_acidvb
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists srcpart_acidvb
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE srcpart_acidvb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_acidvb
+POSTHOOK: query: CREATE TABLE srcpart_acidvb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_acidvb
+PREHOOK: query: insert into srcpart_acidvb PARTITION (ds, hr) select * from srcpart
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidvb
+POSTHOOK: query: insert into srcpart_acidvb PARTITION (ds, hr) select * from srcpart
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: srcpart_acidvb PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select ds, hr, key, value from srcpart_acidvb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidvb
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: select ds, hr, key, value from srcpart_acidvb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidvb
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+#### A masked pattern was here ####
+2008-04-08 11 43 val_43
+2008-04-08 11 413 val_413
+2008-04-08 11 413 val_413
+2008-04-09 11 43 val_43
+2008-04-09 11 413 val_413
+2008-04-09 11 413 val_413
+PREHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidvb
+PREHOOK: Output: default@srcpart_acidvb
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+POSTHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidvb
+POSTHOOK: Output: default@srcpart_acidvb
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+PREHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_acidvb
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_acidvb
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+PREHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_acidvb
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart_acidvb@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_acidvb
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=11
+POSTHOOK: Output: default@srcpart_acidvb@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_acidvb
+ Statistics: Num rows: 1000 Data size: 362000 Basic stats: COMPLETE Column stats: PARTIAL
+ Filter Operator
+ predicate: (UDFToInteger(key)) IN (413, 43) (type: boolean)
+ Statistics: Num rows: 500 Data size: 181000 Basic stats: COMPLETE Column stats: PARTIAL
+ Select Operator
+ expressions: ROW__ID (type: struct<transactionid:bigint,bucketid:int,rowid:bigint>), key (type: string), concat(value, 'updated') (type: string), ds (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 500 D
<TRUNCATED>
[2/3] hive git commit: HIVE-17205 - add functional support for
unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index d61b24b..37aaeb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -50,6 +50,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
private boolean hasFooter;
private boolean isOriginal;
private boolean hasBase;
+ //partition root
+ private Path rootDir;
private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
private long projColsUncompressedSize;
private transient Object fileKey;
@@ -70,7 +72,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts,
OrcTail orcTail, boolean isOriginal, boolean hasBase,
- List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen) {
+ List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen, Path rootDir) {
super(path, offset, length, hosts);
// For HDFS, we could avoid serializing file ID and just replace the path with inode-based
// path. However, that breaks bunch of stuff because Hive later looks up things by split path.
@@ -79,6 +81,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
hasFooter = this.orcTail != null;
this.isOriginal = isOriginal;
this.hasBase = hasBase;
+ this.rootDir = rootDir;
this.deltas.addAll(deltas);
this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize;
// setting file length to Long.MAX_VALUE will let orc reader read file length from file system
@@ -129,6 +132,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
((Writable)fileKey).write(out);
}
out.writeLong(fileLen);
+ out.writeUTF(rootDir.toString());
}
@Override
@@ -168,6 +172,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
this.fileKey = fileId;
}
fileLen = in.readLong();
+ rootDir = new Path(in.readUTF());
}
public OrcTail getOrcTail() {
@@ -186,6 +191,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
return hasBase;
}
+ public Path getRootDir() {
+ return rootDir;
+ }
public List<AcidInputFormat.DeltaMetaData> getDeltas() {
return deltas;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 8f80710..138e56e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
@@ -357,7 +358,7 @@ public class VectorizedOrcAcidRowBatchReader
int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
- OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
+ OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true);
assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
validTxnList, readerOptions, deleteDeltas,
@@ -530,6 +531,9 @@ public class VectorizedOrcAcidRowBatchReader
* For every call to next(), it returns the next smallest record id in the file if available.
* Internally, the next() buffers a row batch and maintains an index pointer, reading the
* next batch when the previous batch is exhausted.
+ *
+ * For unbucketed tables this will currently return all delete events. Once we trust that
+ * the N in bucketN for "base" spit is reliable, all delete events not matching N can be skipped.
*/
static class DeleteReaderValue {
private VectorizedRowBatch batch;
@@ -538,9 +542,10 @@ public class VectorizedOrcAcidRowBatchReader
private final int bucketForSplit; // The bucket value should be same for all the records.
private final ValidTxnList validTxnList;
private boolean isBucketPropertyRepeating;
+ private final boolean isBucketedTable;
public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
- ValidTxnList validTxnList) throws IOException {
+ ValidTxnList validTxnList, boolean isBucketedTable) throws IOException {
this.recordReader = deleteDeltaReader.rowsOptions(readerOptions);
this.bucketForSplit = bucket;
this.batch = deleteDeltaReader.getSchema().createRowBatch();
@@ -549,6 +554,7 @@ public class VectorizedOrcAcidRowBatchReader
}
this.indexPtrInBatch = 0;
this.validTxnList = validTxnList;
+ this.isBucketedTable = isBucketedTable;
checkBucketId();//check 1st batch
}
@@ -615,6 +621,13 @@ public class VectorizedOrcAcidRowBatchReader
* either the split computation got messed up or we found some corrupted records.
*/
private void checkBucketId(int bucketPropertyFromRecord) throws IOException {
+ if(!isBucketedTable) {
+ /**
+ * in this case a file inside a delete_delta_x_y/bucketN may contain any value for
+ * bucketId in {@link RecordIdentifier#getBucketProperty()}
+ */
+ return;
+ }
int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord)
.decodeWriterId(bucketPropertyFromRecord);
if(bucketIdFromRecord != bucketForSplit) {
@@ -686,14 +699,16 @@ public class VectorizedOrcAcidRowBatchReader
this.rowIds = null;
this.compressedOtids = null;
int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY);
+ final boolean isBucketedTable = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
try {
final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
if (deleteDeltaDirs.length > 0) {
int totalDeleteEventCount = 0;
for (Path deleteDeltaDir : deleteDeltaDirs) {
- Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket);
- FileSystem fs = deleteDeltaFile.getFileSystem(conf);
+ FileSystem fs = deleteDeltaDir.getFileSystem(conf);
+ for(Path deleteDeltaFile : OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, conf,
+ new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable)) {
// NOTE: Calling last flush length below is more for future-proofing when we have
// streaming deletes. But currently we don't support streaming deletes, and this can
// be removed if this becomes a performance issue.
@@ -721,7 +736,7 @@ public class VectorizedOrcAcidRowBatchReader
throw new DeleteEventsOverflowMemoryException();
}
DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
- readerOptions, bucket, validTxnList);
+ readerOptions, bucket, validTxnList, isBucketedTable);
DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
if (deleteReaderValue.next(deleteRecordKey)) {
sortMerger.put(deleteRecordKey, deleteReaderValue);
@@ -730,6 +745,7 @@ public class VectorizedOrcAcidRowBatchReader
}
}
}
+ }
if (totalDeleteEventCount > 0) {
// Initialize the rowId array when we have some delete events.
rowIds = new long[totalDeleteEventCount];
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 76aa39f..4b11a4a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -206,6 +206,7 @@ public class SortedDynPartitionOptimizer extends Transform {
if(!VirtualColumn.ROWID.getTypeInfo().equals(ci.getType())) {
throw new IllegalStateException("expected 1st column to be ROW__ID but got wrong type: " + ci.toString());
}
+ //HIVE-17328: not sure this is correct... I don't think is gets wrapped in UDFToInteger....
bucketColumns.add(new ExprNodeColumnDesc(ci));
} else {
if (!destTable.getSortCols().isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index bc6e0d5..e8acabe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7377,9 +7377,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
// Check constraints on acid tables. This includes
- // * no insert overwrites
- // * no use of vectorization
- // * turns off reduce deduplication optimization, as that sometimes breaks acid
// * Check that the table is bucketed
// * Check that the table is not sorted
// This method assumes you have already decided that this is an Acid write. Don't call it if
@@ -7397,9 +7394,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
*/
conf.set(AcidUtils.CONF_ACID_KEY, "true");
- if (table.getNumBuckets() < 1) {
- throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, table.getTableName());
- }
if (table.getSortCols() != null && table.getSortCols().size() > 0) {
throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, table.getTableName());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 5e2146e..04ef7fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -93,7 +94,7 @@ public class CompactorMR {
static final private String IS_MAJOR = "hive.compactor.is.major";
static final private String IS_COMPRESSED = "hive.compactor.is.compressed";
static final private String TABLE_PROPS = "hive.compactor.table.props";
- static final private String NUM_BUCKETS = "hive.compactor.num.buckets";
+ static final private String NUM_BUCKETS = hive_metastoreConstants.BUCKET_COUNT;
static final private String BASE_DIR = "hive.compactor.base.dir";
static final private String DELTA_DIRS = "hive.compactor.delta.dirs";
static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search";
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index bff9884..0f129fc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -69,68 +69,17 @@ import java.util.concurrent.TimeUnit;
* Tests here are for multi-statement transactions (WIP) and those that don't need to
* run with Acid 2.0 (see subclasses of TestTxnCommands2)
*/
-public class TestTxnCommands {
+public class TestTxnCommands extends TestTxnCommandsBase {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
- private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
- //bucket count for test tables; set it to 1 for easier debugging
- private static int BUCKET_COUNT = 2;
- @Rule
- public TestName testName = new TestName();
- private HiveConf hiveConf;
- private Driver d;
- private static enum Table {
- ACIDTBL("acidTbl"),
- ACIDTBLPART("acidTblPart"),
- ACIDTBL2("acidTbl2"),
- NONACIDORCTBL("nonAcidOrcTbl"),
- NONACIDORCTBL2("nonAcidOrcTbl2");
-
- private final String name;
- @Override
- public String toString() {
- return name;
- }
- Table(String name) {
- this.name = name;
- }
+ @Override
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
}
- @Before
- public void setUp() throws Exception {
- tearDown();
- hiveConf = new HiveConf(this.getClass());
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
- hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
- hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
- hiveConf
- .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
- TxnDbUtil.setConfValues(hiveConf);
- TxnDbUtil.prepDb();
- File f = new File(TEST_WAREHOUSE_DIR);
- if (f.exists()) {
- FileUtil.fullyDelete(f);
- }
- if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
- throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
- }
- SessionState.start(new SessionState(hiveConf));
- d = new Driver(hiveConf);
- d.setMaxRows(10000);
- dropTables();
- runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
- runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
- runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
- }
private void dropTables() throws Exception {
for(Table t : Table.values()) {
runStatementOnDriver("drop table if exists " + t);
@@ -150,7 +99,7 @@ public class TestTxnCommands {
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}
}
- @Test
+ @Test//todo: what is this for?
public void testInsertOverwrite() throws Exception {
runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2);
runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
@@ -172,7 +121,7 @@ public class TestTxnCommands {
if(true) {
return;
}
- Path bucket = AcidUtils.createBucketFile(new Path(new Path(TEST_WAREHOUSE_DIR, table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum);
+ Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum);
FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName());
// try {
// FileDump.printJsonData(hiveConf, bucket.toString(), delta);
@@ -446,7 +395,7 @@ public class TestTxnCommands {
}
}
Assert.assertNotNull(txnInfo);
- Assert.assertEquals(12, txnInfo.getId());
+ Assert.assertEquals(14, txnInfo.getId());
Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
String[] vals = s.split("\\s+");
@@ -490,33 +439,6 @@ public class TestTxnCommands {
}
}
- /**
- * takes raw data and turns it into a string as if from Driver.getResults()
- * sorts rows in dictionary order
- */
- private List<String> stringifyValues(int[][] rowsIn) {
- return TestTxnCommands2.stringifyValues(rowsIn);
- }
- private String makeValuesClause(int[][] rows) {
- return TestTxnCommands2.makeValuesClause(rows);
- }
-
- private List<String> runStatementOnDriver(String stmt) throws Exception {
- CommandProcessorResponse cpr = d.run(stmt);
- if(cpr.getResponseCode() != 0) {
- throw new RuntimeException(stmt + " failed: " + cpr);
- }
- List<String> rs = new ArrayList<String>();
- d.getResults(rs);
- return rs;
- }
- private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception {
- CommandProcessorResponse cpr = d.run(stmt);
- if(cpr.getResponseCode() != 0) {
- return cpr;
- }
- throw new RuntimeException("Didn't get expected failure!");
- }
@Test
public void exchangePartition() throws Exception {
@@ -872,8 +794,8 @@ public class TestTxnCommands {
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000016_0000016_0000/bucket_00001"));
//run Compaction
runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
@@ -884,13 +806,13 @@ public class TestTxnCommands {
}
Assert.assertEquals("", 4, rs.size());
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000016/bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
- Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
- Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
//make sure they are the same before and after compaction
}
@@ -940,4 +862,4 @@ public class TestTxnCommands {
int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}};
Assert.assertEquals(stringifyValues(expected), r);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 0e0fca3..21b4a2c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -304,8 +304,8 @@ public class TestTxnCommands2 {
// 1. Insert five rows to Non-ACID table.
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(3,4),(5,6),(7,8),(9,10)");
- // 2. Convert NONACIDORCTBL to ACID table.
- runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+ // 2. Convert NONACIDORCTBL to ACID table. //todo: remove trans_prop after HIVE-17089
+ runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = b*2 where b in (4,10)");
runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 7");
@@ -331,6 +331,7 @@ public class TestTxnCommands2 {
/**
* see HIVE-16177
* See also {@link TestTxnCommands#testNonAcidToAcidConversion01()}
+ * {@link TestTxnNoBuckets#testCTAS()}
*/
@Test
public void testNonAcidToAcidConversion02() throws Exception {
@@ -341,8 +342,8 @@ public class TestTxnCommands2 {
//create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?)
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)");
- //convert the table to Acid
- runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+ //convert the table to Acid //todo: remove trans_prop after HIVE-17089
+ runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
List<String> rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL);
//create a some of delta directories
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)");
@@ -361,6 +362,7 @@ public class TestTxnCommands2 {
* All ROW__IDs are unique on read after conversion to acid
* ROW__IDs are exactly the same before and after compaction
* Also check the file name (only) after compaction for completeness
+ * Note: order of rows in a file ends up being the reverse of order in values clause (why?!)
*/
String[][] expected = {
{"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"},
@@ -2022,7 +2024,7 @@ public class TestTxnCommands2 {
}
static String makeValuesClause(int[][] rows) {
assert rows.length > 0;
- StringBuilder sb = new StringBuilder("values");
+ StringBuilder sb = new StringBuilder(" values");
for(int[] row : rows) {
assert row.length > 0;
if(row.length > 1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java
new file mode 100644
index 0000000..d6e709d
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java
@@ -0,0 +1,162 @@
+package org.apache.hadoop.hive.ql;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class TestTxnCommandsBase {
+ //bucket count for test tables; set it to 1 for easier debugging
+ final static int BUCKET_COUNT = 2;
+ @Rule
+ public TestName testName = new TestName();
+ HiveConf hiveConf;
+ Driver d;
+ enum Table {
+ ACIDTBL("acidTbl"),
+ ACIDTBLPART("acidTblPart"),
+ ACIDTBL2("acidTbl2"),
+ NONACIDORCTBL("nonAcidOrcTbl"),
+ NONACIDORCTBL2("nonAcidOrcTbl2"),
+ NONACIDNONBUCKET("nonAcidNonBucket");
+
+ final String name;
+ @Override
+ public String toString() {
+ return name;
+ }
+ Table(String name) {
+ this.name = name;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ setUpInternal();
+ }
+ void setUpInternal() throws Exception {
+ tearDown();
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
+ hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ hiveConf
+ .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.prepDb();
+ File f = new File(getWarehouseDir());
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(getWarehouseDir()).mkdirs())) {
+ throw new RuntimeException("Could not create " + getWarehouseDir());
+ }
+ SessionState.start(new SessionState(hiveConf));
+ d = new Driver(hiveConf);
+ d.setMaxRows(10000);
+ dropTables();
+ runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc");
+ }
+ private void dropTables() throws Exception {
+ for(TestTxnCommandsBase.Table t : TestTxnCommandsBase.Table.values()) {
+ runStatementOnDriver("drop table if exists " + t);
+ }
+ }
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (d != null) {
+ dropTables();
+ d.destroy();
+ d.close();
+ d = null;
+ }
+ } finally {
+ TxnDbUtil.cleanDb();
+ FileUtils.deleteDirectory(new File(getTestDataDir()));
+ }
+ }
+ String getWarehouseDir() {
+ return getTestDataDir() + "/warehouse";
+ }
+ abstract String getTestDataDir();
+ /**
+ * takes raw data and turns it into a string as if from Driver.getResults()
+ * sorts rows in dictionary order
+ */
+ List<String> stringifyValues(int[][] rowsIn) {
+ return TestTxnCommands2.stringifyValues(rowsIn);
+ }
+ String makeValuesClause(int[][] rows) {
+ return TestTxnCommands2.makeValuesClause(rows);
+ }
+
+ List<String> runStatementOnDriver(String stmt) throws Exception {
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(stmt + " failed: " + cpr);
+ }
+ List<String> rs = new ArrayList<String>();
+ d.getResults(rs);
+ return rs;
+ }
+ CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception {
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ return cpr;
+ }
+ throw new RuntimeException("Didn't get expected failure!");
+ }
+ /**
+ * Will assert that actual files match expected.
+ * @param expectedFiles - suffixes of expected Paths. Must be the same length
+ * @param rootPath - table or patition root where to start looking for actual files, recursively
+ */
+ void assertExpectedFileSet(Set<String> expectedFiles, String rootPath) throws Exception {
+ int suffixLength = 0;
+ for(String s : expectedFiles) {
+ if(suffixLength > 0) {
+ assert suffixLength == s.length() : "all entries must be the same length. current: " + s;
+ }
+ suffixLength = s.length();
+ }
+ FileSystem fs = FileSystem.get(hiveConf);
+ Set<String> actualFiles = new HashSet<>();
+ RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(rootPath), true);
+ while (remoteIterator.hasNext()) {
+ LocatedFileStatus lfs = remoteIterator.next();
+ if(!lfs.isDirectory() && org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER.accept(lfs.getPath())) {
+ String p = lfs.getPath().toString();
+ actualFiles.add(p.substring(p.length() - suffixLength, p.length()));
+ }
+ }
+ Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
new file mode 100644
index 0000000..7aca6b2
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -0,0 +1,297 @@
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class TestTxnNoBuckets extends TestTxnCommandsBase {
+ static final private Logger LOG = LoggerFactory.getLogger(TestTxnNoBuckets.class);
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestTxnNoBuckets.class.getCanonicalName()
+ + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+ @Override
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ setUpInternal();
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ }
+
+ /**
+ * Tests that Acid can work with un-bucketed tables.
+ */
+ @Test
+ public void testNoBuckets() throws Exception {
+ int[][] sourceVals1 = {{0,0,0},{3,3,3}};
+ int[][] sourceVals2 = {{1,1,1},{2,2,2}};
+ runStatementOnDriver("drop table if exists tmp");
+ runStatementOnDriver("create table tmp (c1 integer, c2 integer, c3 integer) stored as orc");
+ runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals1));
+ runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2));
+ runStatementOnDriver("drop table if exists nobuckets");
+ runStatementOnDriver("create table nobuckets (c1 integer, c2 integer, c3 integer) stored " +
+ "as orc tblproperties('transactional'='true', 'transactional_properties'='default')");
+ String stmt = "insert into nobuckets select * from tmp";
+ runStatementOnDriver(stmt);
+ List<String> rs = runStatementOnDriver(
+ "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by ROW__ID");
+ Assert.assertEquals("", 4, rs.size());
+ LOG.warn("after insert");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ /**the insert creates 2 output files (presumably because there are 2 input files)
+ * The number in the file name is writerId. This is the number encoded in ROW__ID.bucketId -
+ * see {@link org.apache.hadoop.hive.ql.io.BucketCodec}*/
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t0\t"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
+ /*todo: WTF?
+ RS for update seems to spray randomly... is that OK? maybe as long as all resultant files have different names... will they?
+ Assuming we name them based on taskId, we should create bucketX and bucketY.
+ we delete events can be written to bucketX file it could be useful for filter delete for a split by file name since the insert
+ events seem to be written to a proper bucketX file. In fact this may reduce the number of changes elsewhere like compactor... maybe
+ But this limits the parallelism - what is worse, you don't know what the parallelism should be until you have a list of all the
+ input files since bucket count is no longer a metadata property. Also, with late Update split, the file name has already been determined
+ from taskId so the Insert part won't end up matching the bucketX property necessarily.
+ With early Update split, the Insert can still be an insert - i.e. go to appropriate bucketX. But deletes will still go wherever (random shuffle)
+ unless you know all the bucketX files to be read - may not be worth the trouble.
+ * 2nd: something in FS fails. ArrayIndexOutOfBoundsException: 1 at FileSinkOperator.process(FileSinkOperator.java:779)*/
+ runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)");
+ rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+ LOG.warn("after update");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
+ //so update has 1 writer which creates bucket0 where both new rows land
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000021_0000021_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000021_0000021_0000/bucket_00000"));
+
+ Set<String> expectedFiles = new HashSet<>();
+ //both delete events land in a single bucket0. Each has a different ROW__ID.bucketId value (even writerId in it is different)
+ expectedFiles.add("ts/delete_delta_0000021_0000021_0000/bucket_00000");
+ expectedFiles.add("nobuckets/delta_0000019_0000019_0000/bucket_00000");
+ expectedFiles.add("nobuckets/delta_0000019_0000019_0000/bucket_00001");
+ expectedFiles.add("nobuckets/delta_0000021_0000021_0000/bucket_00000");
+ //check that we get the right files on disk
+ assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+ //todo: it would be nice to check the contents of the files... could use orc.FileDump - it has
+ // methods to print to a supplied stream but those are package private
+
+ runStatementOnDriver("alter table nobuckets compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
+ LOG.warn("after major compact");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ /*
+├── base_0000021
+│ ├── bucket_00000
+│ └── bucket_00001
+├── delete_delta_0000021_0000021_0000
+│ └── bucket_00000
+├── delta_0000019_0000019_0000
+│ ├── bucket_00000
+│ └── bucket_00001
+└── delta_0000021_0000021_0000
+ └── bucket_00000
+ */
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/base_0000021/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/base_0000021/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000021/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/base_0000021/bucket_00001"));
+
+ expectedFiles.clear();
+ expectedFiles.add("delete_delta_0000021_0000021_0000/bucket_00000");
+ expectedFiles.add("uckets/delta_0000019_0000019_0000/bucket_00000");
+ expectedFiles.add("uckets/delta_0000019_0000019_0000/bucket_00001");
+ expectedFiles.add("uckets/delta_0000021_0000021_0000/bucket_00000");
+ expectedFiles.add("/warehouse/nobuckets/base_0000021/bucket_00000");
+ expectedFiles.add("/warehouse/nobuckets/base_0000021/bucket_00001");
+ assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+
+ TestTxnCommands2.runCleaner(hiveConf);
+ rs = runStatementOnDriver("select c1, c2, c3 from nobuckets order by c1, c2, c3");
+ int[][] result = {{0,0,17},{1,1,17},{2,2,2},{3,3,3}};
+ Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs);
+
+ expectedFiles.clear();
+ expectedFiles.add("nobuckets/base_0000021/bucket_00000");
+ expectedFiles.add("nobuckets/base_0000021/bucket_00001");
+ assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
+ }
+
+ /**
+ * all of these pass but don't do exactly the right thing
+ * files land as if it's not an acid table "warehouse/myctas4/000000_0"
+ * even though in {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires
+ * and sees it as transactional table
+ * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer
+ *
+ * On read, these files are treated like non acid to acid conversion
+ *
+ * see HIVE-15899
+ * See CTAS tests in TestAcidOnTez
+ */
+ @Test
+ public void testCTAS() throws Exception {
+ int[][] values = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + makeValuesClause(values));
+ runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" +
+ "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL);
+ List<String> rs = runStatementOnDriver("select * from myctas order by a, b");
+ Assert.assertEquals(stringifyValues(values), rs);
+
+ runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values));
+ runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" +
+ "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);
+ rs = runStatementOnDriver("select * from myctas2 order by a, b");
+ Assert.assertEquals(stringifyValues(values), rs);
+
+ runStatementOnDriver("create table myctas3 stored as ORC TBLPROPERTIES ('transactional" +
+ "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL +
+ " union all select a, b from " + Table.ACIDTBL);
+ rs = runStatementOnDriver("select * from myctas3 order by a, b");
+ Assert.assertEquals(stringifyValues(new int[][] {{1,2},{1,2},{3,4},{3,4}}), rs);
+
+ runStatementOnDriver("create table myctas4 stored as ORC TBLPROPERTIES ('transactional" +
+ "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL +
+ " union distinct select a, b from " + Table.ACIDTBL);
+ rs = runStatementOnDriver("select * from myctas4 order by a, b");
+ Assert.assertEquals(stringifyValues(values), rs);
+ }
+ /**
+ * see HIVE-16177
+ * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} todo need test with > 1 bucket file
+ */
+ @Test
+ public void testToAcidConversion02() throws Exception {
+ //create 2 rows in a file 00000_0
+ runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)");
+ //create 4 rows in a file 000000_0_copy_1
+ runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,12),(0,13),(1,4),(1,5)");
+ //create 1 row in a file 000000_0_copy_2
+ runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,6)");
+
+ //convert the table to Acid //todo: remove trans_prop after HIVE-17089
+ runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+ List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("before acid ops (after convert)");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ //create a some of delta directories
+ runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,15),(1,16)");
+ runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set b = 120 where a = 0 and b = 12");
+ runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,17)");
+ runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 1 and b = 3");
+
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a,b");
+ LOG.warn("before compact");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ /*
+ * All ROW__IDs are unique on read after conversion to acid
+ * ROW__IDs are exactly the same before and after compaction
+ * Also check the file name (only) after compaction for completeness
+ */
+ String[][] expected = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13", "bucket_00000", "000000_0_copy_1"},
+ {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000", "bucket_00000"},
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000", "bucket_00000"},
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000", "bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "bucket_00000", "000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t1\t4", "bucket_00000", "000000_0_copy_1"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t1\t5", "bucket_00000", "000000_0_copy_1"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6", "bucket_00000", "000000_0_copy_2"},
+ {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\t1\t16", "bucket_00000", "bucket_00000"}
+ };
+ Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][2]));
+ }
+ //run Compaction
+ runStatementOnDriver("alter table "+ Table.NONACIDNONBUCKET +" compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ /*
+ nonacidnonbucket/
+ ├── 000000_0
+ ├── 000000_0_copy_1
+ ├── 000000_0_copy_2
+ ├── base_0000021
+ │ └── bucket_00000
+ ├── delete_delta_0000019_0000019_0000
+ │ └── bucket_00000
+ ├── delete_delta_0000021_0000021_0000
+ │ └── bucket_00000
+ ├── delta_0000018_0000018_0000
+ │ └── bucket_00000
+ ├── delta_0000019_0000019_0000
+ │ └── bucket_00000
+ └── delta_0000020_0000020_0000
+ └── bucket_00000
+
+ 6 directories, 9 files
+ */
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a,b");
+ LOG.warn("after compact");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size());
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ //make sure they are the same before and after compaction
+ }
+ /**
+ * Currently CTAS doesn't support bucketed tables. Correspondingly Acid only supports CTAS for
+ * unbucketed tables. This test is here to make sure that if CTAS is made to support unbucketed
+ * tables, that it raises a red flag for Acid.
+ */
+ @Test
+ public void testCtasBucketed() throws Exception {
+ runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)");
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("create table myctas " +
+ "clustered by (a) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') as " +
+ "select a, b from " + Table.NONACIDORCTBL);
+ int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate
+// Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode());
+ Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support"));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index f73d058..4c30732 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -3420,6 +3420,7 @@ public class TestInputOutputFormat {
assertTrue(split.toString().contains("hasFooter=false"));
assertTrue(split.toString().contains("hasBase=true"));
assertTrue(split.toString().contains("deltas=0"));
+ assertTrue(split.toString().contains("isOriginal=true"));
if (split instanceof OrcSplit) {
assertFalse("No footer serialize test for non-vector reader, hasFooter is not expected in" +
" orc splits.", ((OrcSplit) split).hasFooter());
@@ -3435,11 +3436,13 @@ public class TestInputOutputFormat {
}
// call-1: open to read footer - split 1 => mock:/mocktable5/0_0
// call-2: open to read data - split 1 => mock:/mocktable5/0_0
- // call-3: open to read footer - split 2 => mock:/mocktable5/0_1
- // call-4: open to read data - split 2 => mock:/mocktable5/0_1
- // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_0
- // call-6: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_1
- assertEquals(6, readOpsDelta);
+ // call-3: getAcidState - split 1 => mock:/mocktable5 (to compute offset for original read)
+ // call-4: open to read footer - split 2 => mock:/mocktable5/0_1
+ // call-5: open to read data - split 2 => mock:/mocktable5/0_1
+ // call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read)
+ // call-7: open to read footer - split 2 => mock:/mocktable5/0_0 (to get row count)
+ // call-8: file status - split 2 => mock:/mocktable5/0_0
+ assertEquals(8, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3498,6 +3501,7 @@ public class TestInputOutputFormat {
assertTrue(split.toString().contains("hasFooter=true"));
assertTrue(split.toString().contains("hasBase=true"));
assertTrue(split.toString().contains("deltas=0"));
+ assertTrue(split.toString().contains("isOriginal=true"));
if (split instanceof OrcSplit) {
assertTrue("Footer serialize test for ACID reader, hasFooter is expected in" +
" orc splits.", ((OrcSplit) split).hasFooter());
@@ -3512,10 +3516,12 @@ public class TestInputOutputFormat {
}
}
// call-1: open to read data - split 1 => mock:/mocktable6/0_0
- // call-2: open to read data - split 2 => mock:/mocktable6/0_1
- // call-3: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_0
- // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_1
- assertEquals(4, readOpsDelta);
+ // call-2: AcidUtils.getAcidState - split 1 => ls mock:/mocktable6
+ // call-3: open to read data - split 2 => mock:/mocktable6/0_1
+ // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6
+ // call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file)
+ // call-6: file stat - split 2 => mock:/mocktable6/0_0
+ assertEquals(6, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3883,7 +3889,7 @@ public class TestInputOutputFormat {
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2");
OrcSplit split = new OrcSplit(testFilePath, null, 0, fileLength,
new String[0], null, false, true,
- new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength);
+ new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength, workDir);
OrcInputFormat inputFormat = new OrcInputFormat();
AcidInputFormat.RowReader<OrcStruct> reader = inputFormat.getReader(split,
new AcidInputFormat.Options(conf));
@@ -3911,7 +3917,7 @@ public class TestInputOutputFormat {
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2,3");
split = new OrcSplit(testFilePath, null, 0, fileLength,
new String[0], null, false, true,
- new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength);
+ new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength, workDir);
inputFormat = new OrcInputFormat();
reader = inputFormat.getReader(split, new AcidInputFormat.Options(conf));
record = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 82cf108..9628a40 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -826,12 +826,9 @@ public class TestOrcRawRecordMerger {
merger.close();
//now run as if it's a minor Compaction so we don't collapse events
- //it's not exactly like minor compaction since MC would not have a baseReader
//here there is only 1 "split" since we only have data for 1 bucket
- baseReader = OrcFile.createReader(basePath,
- OrcFile.readerOptions(conf));
merger =
- new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+ new OrcRawRecordMerger(conf, false, null, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
assertEquals(null, merger.getMinKey());
@@ -844,40 +841,86 @@ public class TestOrcRawRecordMerger {
assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ //data from delta_200_200
+ assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id);
- assertEquals("first", getValue(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
+ assertEquals("update 1", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
- assertEquals("second", getValue(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+ assertEquals("update 2", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+ assertEquals("update 3", getValue(event));
+
+ assertEquals(false, merger.next(id, event));
+ merger.close();
+
+ //now run as if it's a major Compaction so we collapse events
+ //here there is only 1 "split" since we only have data for 1 bucket
+ baseReader = OrcFile.createReader(basePath,
+ OrcFile.readerOptions(conf));
+ merger =
+ new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+ createMaximalTxnList(), new Reader.Options(),
+ AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options()
+ .isCompacting(true).isMajorCompaction(true));
+ assertEquals(null, merger.getMinKey());
+ assertEquals(null, merger.getMaxKey());
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id);
- assertEquals("third", getValue(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
+ assertEquals("second", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id);
- assertEquals("fourth", getValue(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
@@ -904,12 +947,6 @@ public class TestOrcRawRecordMerger {
assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id);
- assertEquals("eighth", getValue(event));
-
- assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
@@ -918,12 +955,6 @@ public class TestOrcRawRecordMerger {
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id);
- assertEquals("ninth", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
assertEquals("tenth", getValue(event));
@@ -949,7 +980,6 @@ public class TestOrcRawRecordMerger {
assertEquals(false, merger.next(id, event));
merger.close();
-
// try ignoring the 200 transaction and make sure it works still
ValidTxnList txns = new ValidReadTxnList("2000:200:200");
//again 1st split is for base/
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/queries/clientnegative/create_not_acid.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/create_not_acid.q b/ql/src/test/queries/clientnegative/create_not_acid.q
index 8d6c9ac..8ed13fb 100644
--- a/ql/src/test/queries/clientnegative/create_not_acid.q
+++ b/ql/src/test/queries/clientnegative/create_not_acid.q
@@ -2,5 +2,5 @@ set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-create table acid_notbucketed(a int, b varchar(128)) stored as orc TBLPROPERTIES ('transactional'='true');
+create table acid_notbucketed(a int, b varchar(128)) TBLPROPERTIES ('transactional'='true');
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/queries/clientpositive/acid_no_buckets.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_no_buckets.q b/ql/src/test/queries/clientpositive/acid_no_buckets.q
new file mode 100644
index 0000000..c2f713e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_no_buckets.q
@@ -0,0 +1,210 @@
+--this has 4 groups of tests
+--Acid tables w/o bucketing
+--the tests with bucketing (make sure we get the same results)
+--same tests with and w/o vectorization
+
+set hive.mapred.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=false;
+set hive.explain.user=false;
+set hive.merge.cardinality.check=true;
+
+drop table if exists srcpart_acid;
+CREATE TABLE srcpart_acid (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acid PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 1 row for 43, 2 for 213, 1 for 44 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acid where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acid where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acid PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acid where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acid where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acid where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acid where key in( '1001', '213', '43');
+--make sure nothing extra was deleted (2000 + 3 (insert) - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acid;
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acid t using (select distinct ds, hr, key, value from srcpart_acid) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acid where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acid where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acid;
+
+
+drop table if exists srcpart_acidb;
+CREATE TABLE srcpart_acidb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acidb PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 1 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acidb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acidb where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acidb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acidb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acidb where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acidb where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acidb where key in( '1001', '213', '43');
+--make sure nothing extra was deleted (2000 + 3 (insert) - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acidb;
+
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acidb t using (select distinct ds, hr, key, value from srcpart_acidb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acidb where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acidb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acidb;
+
+
+
+--now same thing but vectorized
+set hive.vectorized.execution.enabled=true;
+
+drop table if exists srcpart_acidv;
+CREATE TABLE srcpart_acidv (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acidv PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 21 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acidv where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acidv where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acidv PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acidv where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acidv where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acidv where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acidv where key in( '1001', '213', '43');
+--make sure nothing extra was deleted (2000 + 3 - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acidv;
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acidv t using (select distinct ds, hr, key, value from srcpart_acidv) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acidv where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acidv where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acidv;
+
+
+
+drop table if exists srcpart_acidvb;
+CREATE TABLE srcpart_acidvb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into srcpart_acidvb PARTITION (ds, hr) select * from srcpart;
+
+--2 rows for 413, 1 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition)
+select ds, hr, key, value from srcpart_acidvb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns;
+explain update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11';
+select ds, hr, key, value from srcpart_acidvb where value like '%updated' order by ds, hr, cast(key as integer);
+
+insert into srcpart_acidvb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003');
+select ds, hr, key, value from srcpart_acidvb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer);
+
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics;
+analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns;
+explain delete from srcpart_acidvb where key in( '1001', '213', '43');
+--delete some rows from initial load, some that were updated and some that were inserted
+delete from srcpart_acidvb where key in( '1001', '213', '43');
+
+--make sure we deleted everything that should've been deleted
+select count(*) from srcpart_acidvb where key in( '1001', '213', '43');
+--make sure nothing extra was deleted (2000 + 3 (insert) - 4 - 1 - 8 = 1990)
+select count(*) from srcpart_acidvb;
+
+
+--todo: should really have a way to run compactor here....
+
+--update should match 1 rows in 1 partition
+--delete should drop everything from 1 partition
+--insert should do nothing
+merge into srcpart_acidvb t using (select distinct ds, hr, key, value from srcpart_acidvb) s
+on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value
+when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge')
+when matched and s.ds='2008-04-08' and s.hr=='12' then delete
+when not matched then insert values('this','should','not','be there');
+
+--check results
+--should be 0
+select count(*) from srcpart_acidvb where ds='2008-04-08' and hr=='12';
+--should be 1 rows
+select ds, hr, key, value from srcpart_acidvb where value like '%updated by merge';
+--should be 0
+select count(*) from srcpart_acidvb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there';
+drop table if exists srcpart_acidvb;
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/create_not_acid.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/create_not_acid.q.out b/ql/src/test/results/clientnegative/create_not_acid.q.out
index bb8f6c9..4e775e5 100644
--- a/ql/src/test/results/clientnegative/create_not_acid.q.out
+++ b/ql/src/test/results/clientnegative/create_not_acid.q.out
@@ -1,5 +1,5 @@
-PREHOOK: query: create table acid_notbucketed(a int, b varchar(128)) stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: query: create table acid_notbucketed(a int, b varchar(128)) TBLPROPERTIES ('transactional'='true')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@acid_notbucketed
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:The table must be bucketed and stored using an ACID compliant format (such as ORC))
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:The table must be stored using an ACID compliant format (such as ORC))
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/delete_non_acid_table.q.out b/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
index a9b884a..19fd5fb 100644
--- a/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
+++ b/ql/src/test/results/clientnegative/delete_non_acid_table.q.out
@@ -34,4 +34,4 @@ POSTHOOK: Input: default@not_an_acid_table2
-1070883071 0ruyd6Y50JpdGRf6HqD
-1070551679 iUR3Q
-1069736047 k17Am8uPHWk02cEf1jet
-FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table2 that does not use an AcidOutputFormat or is not bucketed
+FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table2 that is not transactional
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/update_non_acid_table.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/update_non_acid_table.q.out b/ql/src/test/results/clientnegative/update_non_acid_table.q.out
index 381b0db..02946fc 100644
--- a/ql/src/test/results/clientnegative/update_non_acid_table.q.out
+++ b/ql/src/test/results/clientnegative/update_non_acid_table.q.out
@@ -34,4 +34,4 @@ POSTHOOK: Input: default@not_an_acid_table
-1070883071 0ruyd6Y50JpdGRf6HqD
-1070551679 iUR3Q
-1069736047 k17Am8uPHWk02cEf1jet
-FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table that does not use an AcidOutputFormat or is not bucketed
+FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table that is not transactional
[3/3] hive git commit: HIVE-17205 - add functional support for
unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6be50b76
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6be50b76
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6be50b76
Branch: refs/heads/master
Commit: 6be50b76be5956b3c52ed6024fd7b4a3dee65fb6
Parents: 262d8f9
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 25 20:14:57 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 25 20:15:26 2017 -0700
----------------------------------------------------------------------
.../streaming/AbstractRecordWriter.java | 44 +-
.../hive/hcatalog/streaming/HiveEndPoint.java | 4 +-
.../apache/hive/hcatalog/streaming/package.html | 21 +-
.../hive/hcatalog/streaming/TestStreaming.java | 86 +-
.../hive/metastore/TestHiveMetaStore.java | 8 +-
.../apache/hadoop/hive/ql/TestAcidOnTez.java | 367 +++-
.../hive/ql/TestAcidOnTezWithSplitUpdate.java | 28 -
.../test/resources/testconfiguration.properties | 4 +-
.../TransactionalValidationListener.java | 42 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 4 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 22 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 60 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 225 +-
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 14 +-
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 10 +-
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 26 +-
.../optimizer/SortedDynPartitionOptimizer.java | 1 +
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 6 -
.../hive/ql/txn/compactor/CompactorMR.java | 3 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 108 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 12 +-
.../hadoop/hive/ql/TestTxnCommandsBase.java | 162 ++
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 297 +++
.../hive/ql/io/orc/TestInputOutputFormat.java | 28 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 86 +-
.../queries/clientnegative/create_not_acid.q | 2 +-
.../queries/clientpositive/acid_no_buckets.q | 210 ++
.../clientnegative/create_not_acid.q.out | 4 +-
.../clientnegative/delete_non_acid_table.q.out | 2 +-
.../clientnegative/update_non_acid_table.q.out | 2 +-
.../clientpositive/llap/acid_no_buckets.q.out | 1976 ++++++++++++++++++
31 files changed, 3534 insertions(+), 330 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index e409e75..4ec10ad 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -46,6 +46,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
@@ -54,19 +55,23 @@ import java.util.Properties;
public abstract class AbstractRecordWriter implements RecordWriter {
static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
- final HiveConf conf;
- final HiveEndPoint endPoint;
+ private final HiveConf conf;
+ private final HiveEndPoint endPoint;
final Table tbl;
- final IMetaStoreClient msClient;
- protected final List<Integer> bucketIds;
- ArrayList<RecordUpdater> updaters = null;
+ private final IMetaStoreClient msClient;
+ final List<Integer> bucketIds;
+ private ArrayList<RecordUpdater> updaters = null;
- public final int totalBuckets;
+ private final int totalBuckets;
+ /**
+ * Indicates whether target table is bucketed
+ */
+ private final boolean isBucketed;
private final Path partitionPath;
- final AcidOutputFormat<?,?> outf;
+ private final AcidOutputFormat<?,?> outf;
private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
private Long curBatchMinTxnId;
private Long curBatchMaxTxnId;
@@ -109,16 +114,22 @@ public abstract class AbstractRecordWriter implements RecordWriter {
this.tbl = twp.tbl;
this.partitionPath = twp.partitionPath;
}
- this.totalBuckets = tbl.getSd().getNumBuckets();
- if (totalBuckets <= 0) {
- throw new StreamingException("Cannot stream to table that has not been bucketed : "
- + endPoint);
+ this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+ /**
+ * For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+ * ends up writing to a file bucket_000000
+ * See also {@link #getBucket(Object)}
+ */
+ this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+ if(isBucketed) {
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+ this.bucketFieldData = new Object[bucketIds.size()];
+ }
+ else {
+ bucketIds = Collections.emptyList();
}
- this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
- this.bucketFieldData = new Object[bucketIds.size()];
String outFormatName = this.tbl.getSd().getOutputFormat();
outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
- bucketFieldData = new Object[bucketIds.size()];
} catch(InterruptedException e) {
throw new StreamingException(endPoint2.toString(), e);
} catch (MetaException | NoSuchObjectException e) {
@@ -169,6 +180,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
// returns the bucket number to which the record belongs to
protected int getBucket(Object row) throws SerializationError {
+ if(!isBucketed) {
+ return 0;
+ }
ObjectInspector[] inspectors = getBucketObjectInspectors();
Object[] bucketFields = getBucketFields(row);
return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
@@ -204,7 +218,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
curBatchMaxTxnId = maxTxnID;
updaters = new ArrayList<RecordUpdater>(totalBuckets);
for (int bucket = 0; bucket < totalBuckets; bucket++) {
- updaters.add(bucket, null);
+ updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 81f6155..28c98bd 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -20,6 +20,8 @@ package org.apache.hive.hcatalog.streaming;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.cli.CliSessionState;
@@ -338,7 +340,7 @@ public class HiveEndPoint {
// 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
Map<String, String> params = t.getParameters();
if (params != null) {
- String transactionalProp = params.get("transactional");
+ String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) {
LOG.error("'transactional' property is not set on Table " + endPoint);
throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" +
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
index ed4d307..a879b97 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
@@ -30,7 +30,7 @@ partition. Once data is committed it becomes immediately visible to
all Hive queries initiated subsequently.</p>
<p>
-This API is intended for streaming clients such as Flume and Storm,
+This API is intended for streaming clients such as NiFi, Flume and Storm,
which continuously generate data. Streaming support is built on top of
ACID based insert/update support in Hive.</p>
@@ -56,10 +56,7 @@ A few things are currently required to use streaming.
<ol>
<li> Currently, only ORC storage format is supported. So
'<b>stored as orc</b>' must be specified during table creation.</li>
- <li> The hive table must be bucketed, but not sorted. So something like
- '<b>clustered by (<i>colName</i>) into <i>10</i> buckets</b>' must
- be specified during table creation. The number of buckets
- is ideally the same as the number of streaming writers.</li>
+ <li> The hive table may be bucketed but must not be sorted. </li>
<li> User of the client streaming process must have the necessary
permissions to write to the table or partition and create partitions in
the table.</li>
@@ -67,7 +64,6 @@ A few things are currently required to use streaming.
<ol>
<li><b>hive.input.format =
org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
- <li><b>hive.vectorized.execution.enabled = false</b></li>
</ol></li>
The above client settings are a temporary requirement and the intention is to
drop the need for them in the near future.
@@ -165,8 +161,21 @@ additional implementations of the <b>RecordWriter</b> interface.
- Delimited text input.</li>
<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a>
- JSON text input.</li>
+ <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a>
+ - text input with regex.</li>
</ul></p>
+<h2>Performance, Concurrency, Etc.</h2>
+<p>
+ Each StreamingConnection is writing data at the rate the underlying
+ FileSystem can accept it. If that is not sufficient, multiple StreamingConnection objects can
+ be created concurrently.
+</p>
+<p>
+ Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+ may have at most 2 threads operaing on it.
+ See <a href="TransactionBatch.html"><b>TransactionBatch</b></a>
+</p>
</body>
</html>
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index f3ef92b..49520ef 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -74,6 +77,7 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -193,7 +197,6 @@ public class TestStreaming {
conf = new HiveConf(this.getClass());
conf.set("fs.raw.impl", RawFileSystem.class.getName());
- conf.set("hive.enforce.bucketing", "true");
conf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
@@ -339,6 +342,83 @@ public class TestStreaming {
}
}
+ /**
+ * Test that streaming can write to unbucketed table.
+ */
+ @Test
+ public void testNoBuckets() throws Exception {
+ queryTable(driver, "drop table if exists default.streamingnobuckets");
+ //todo: why does it need transactional_properties?
+ queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
+ queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
+ List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals("foo\tbar", rs.get(0));
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+ String[] colNames1 = new String[] { "a", "b" };
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("a1,b2".getBytes());
+ txnBatch.write("a3,b4".getBytes());
+ txnBatch.commit();
+ txnBatch.beginNextTransaction();
+ txnBatch.write("a5,b6".getBytes());
+ txnBatch.write("a7,b8".getBytes());
+ txnBatch.commit();
+ txnBatch.close();
+
+ Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+ Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+ Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+
+ queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
+ queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+ rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
+ int row = 0;
+ Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+
+ queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
+ runWorker(conf);
+ rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ }
+
+ /**
+ * this is a clone from TestTxnStatement2....
+ */
+ public static void runWorker(HiveConf hiveConf) throws MetaException {
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
// stream data into streaming table with N buckets, then copy the data into another bucketed table
// check if bucketing in both was done in the same way
@@ -453,8 +533,8 @@ public class TestStreaming {
}
/**
- * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
- * little value in using InputFormat directly
+ * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} -
+ * there is little value in using InputFormat directly
*/
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
index 8bd23cc..50e5274 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
@@ -3018,7 +3018,7 @@ public abstract class TestHiveMetaStore extends TestCase {
Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
Assert.assertTrue("Expected exception", false);
} catch (MetaException e) {
- Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+ Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
}
// Fail - "transactional" is set to true, and the table is bucketed, but doesn't use ORC
@@ -3031,7 +3031,7 @@ public abstract class TestHiveMetaStore extends TestCase {
Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
Assert.assertTrue("Expected exception", false);
} catch (MetaException e) {
- Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+ Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
}
// Succeed - "transactional" is set to true, and the table is bucketed, and uses ORC
@@ -3064,13 +3064,14 @@ public abstract class TestHiveMetaStore extends TestCase {
tblName += "1";
params.clear();
sd.unsetBucketCols();
+ sd.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
t = createTable(dbName, tblName, owner, params, null, sd, 0);
params.put("transactional", "true");
t.setParameters(params);
client.alter_table(dbName, tblName, t);
Assert.assertTrue("Expected exception", false);
} catch (MetaException e) {
- Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+ Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
}
// Succeed - trying to set "transactional" to "true", and satisfies bucketing and Input/OutputFormat requirement
@@ -3078,6 +3079,7 @@ public abstract class TestHiveMetaStore extends TestCase {
params.clear();
sd.setNumBuckets(1);
sd.setBucketCols(bucketCols);
+ sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
t = createTable(dbName, tblName, owner, params, null, sd, 0);
params.put("transactional", "true");
t.setParameters(params);
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 2bf9871..d0b5cf6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -22,7 +22,10 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -30,23 +33,26 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class resides in itests to facilitate running query using Tez engine, since the jars are
* fully loaded here, which is not the case if it stays in ql.
*/
public class TestAcidOnTez {
+ static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class);
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestAcidOnTez.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
@@ -61,8 +67,10 @@ public class TestAcidOnTez {
private static enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
+ ACIDNOBUCKET("acidNoBucket"),
NONACIDORCTBL("nonAcidOrcTbl"),
- NONACIDPART("nonAcidPart");
+ NONACIDPART("nonAcidPart"),
+ NONACIDNONBUCKET("nonAcidNonBucket");
private final String name;
@Override
@@ -159,6 +167,359 @@ public class TestAcidOnTez {
testJoin("tez", "MapJoin");
}
+ /**
+ * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+ * where "original" files are not immediate children of the partition dir
+ */
+ @Test
+ public void testNonStandardConversion01() throws Exception {
+ HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+ setupTez(confForTez);
+ //CTAS with non-ACID target table
+ runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC TBLPROPERTIES('transactional'='false') as " +
+ "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+
+ List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME");
+ String expected0[][] = {
+ {"1\t2", "/1/000000_0"},
+ {"3\t4", "/1/000000_0"},
+ {"5\t6", "/1/000000_0"},
+ {"5\t6", "/2/000000_0"},
+ {"7\t8", "/2/000000_0"},
+ {"9\t10", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected0.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+ }
+ //make the table ACID
+ runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')");
+
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after ctas:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ /*
+ * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/
+ String expected[][] = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ //perform some Update/Delete
+ runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b = 80 where a = 7");
+ runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5");
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after update/delete:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ String[][] expected2 = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"}
+ };
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //now make sure delete deltas are present
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+ expectedDelDelta[i] = null;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+ }
+ //run Minor compaction
+ runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after compact minor:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify the data is the same
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+ //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //check we have right delete delta files after minor compaction
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta2 = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000", "delete_delta_0000021_0000022"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+ expectedDelDelta2[i] = null;
+ break;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+ }
+ //run Major compaction
+ runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after compact major:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //everything is now in base/
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000022/bucket_00000"));
+ }
+ }
+ /**
+ * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+ * where "original" files are not immediate children of the partition dir - partitioned table
+ *
+ * How to do this? CTAS is the only way to create data files which are not immediate children
+ * of the partition dir. CTAS/Union/Tez doesn't support partition tables. The only way is to copy
+ * data files in directly.
+ */
+ @Ignore("HIVE-17214")
+ @Test
+ public void testNonStandardConversion02() throws Exception {
+ HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+ confForTez.setBoolean("mapred.input.dir.recursive", true);
+ setupTez(confForTez);
+ runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC " +
+ "TBLPROPERTIES('transactional'='false') as " +
+ "select a, b from " + Table.ACIDTBL + " where a <= 3 union all " +
+ "select a, b from " + Table.NONACIDORCTBL + " where a >= 7 " +
+ "union all select a, b from " + Table.ACIDTBL + " where a = 5", confForTez);
+
+ List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " +
+ Table.NONACIDNONBUCKET + " order by a, b");
+ String expected0[][] = {
+ {"1\t2", "/1/000000_0"},
+ {"3\t4", "/1/000000_0"},
+ {"5\t6", "/3/000000_0"},
+ {"7\t8", "/2/000000_0"},
+ {"9\t10", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected0.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+ }
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ //ensure there is partition dir
+ runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)");
+ //creates more files in that partition
+ for(FileStatus stat : status) {
+ int limit = 5;
+ Path p = stat.getPath();//dirs 1/, 2/, 3/
+ Path to = new Path(TEST_WAREHOUSE_DIR + "/" + Table.NONACIDPART+ "/p=1/" + p.getName());
+ while(limit-- > 0 && !fs.rename(p, to)) {
+ Thread.sleep(200);
+ }
+ if(limit <= 0) {
+ throw new IllegalStateException("Could not rename " + p + " to " + to);
+ }
+ }
+ /*
+ This is what we expect on disk
+ ekoifman:warehouse ekoifman$ tree nonacidpart/
+ nonacidpart/
+ └── p=1
+ ├── 000000_0
+ ├── 1
+ │ └── 000000_0
+ ├── 2
+ │ └── 000000_0
+ └── 3
+ └── 000000_0
+
+4 directories, 4 files
+ **/
+ //make the table ACID
+ runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')");
+ rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+ LOG.warn("after acid conversion:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ String[][] expected = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"}
+ };
+ Assert.assertEquals("Wrong row count", expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+
+ //run Major compaction
+ runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+ LOG.warn("after major compaction:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " ac: " +
+ rs.get(i), rs.get(i).endsWith("nonacidpart/p=1/base_-9223372036854775808/bucket_00000"));
+ }
+
+ }
+ /**
+ * CTAS + Tez + Union creates a non-standard layout in table dir
+ * Each leg of the union places data into a subdir of the table/partition. Subdirs are named 1/, 2/, etc
+ * The way this currently works is that CTAS creates an Acid table but the insert statement writes
+ * the data in non-acid layout. Then on read, it's treated like an non-acid to acid conversion.
+ * Longer term CTAS should create acid layout from the get-go.
+ */
+ @Test
+ public void testCtasTezUnion() throws Exception {
+ HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+ setupTez(confForTez);
+ //CTAS with ACID target table
+ runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " +
+ "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+ List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after ctas:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ /*
+ * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/
+ String expected[][] = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ //perform some Update/Delete
+ runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b = 80 where a = 7");
+ runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5");
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after update/delete:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ String[][] expected2 = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"}
+ };
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //now make sure delete deltas are present
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+ expectedDelDelta[i] = null;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+ }
+ //run Minor compaction
+ runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after compact minor:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify the data is the same
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+ //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //check we have right delete delta files after minor compaction
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+ expectedDelDelta2[i] = null;
+ break;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+ }
+ //run Major compaction
+ runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after compact major:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //everything is now in base/
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000"));
+ }
+ }
// Ideally test like this should be a qfile test. However, the explain output from qfile is always
// slightly different depending on where the test is run, specifically due to file size estimation
private void testJoin(String engine, String joinType) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
deleted file mode 100644
index 3dacf08..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql;
-
-/**
- * Same as parent class but covers Acid 2.0 tables
- */
-public class TestAcidOnTezWithSplitUpdate extends TestAcidOnTez {
- @Override
- String getTblProperties() {
- return "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')";
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 37a3757..fa6a2aa 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -456,7 +456,9 @@ minillap.query.files=acid_bucket_pruning.q,\
llap_stats.q,\
multi_count_distinct_null.q
-minillaplocal.query.files=acid_globallimit.q,\
+minillaplocal.query.files=\
+ acid_no_buckets.q, \
+ acid_globallimit.q,\
acid_vectorization_missing_cols.q,\
alter_merge_stats_orc.q,\
auto_join30.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 023d703..3a3d184 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -98,14 +98,26 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
// that will use it down below.
}
}
+ Table oldTable = context.getOldTable();
+ String oldTransactionalValue = null;
+ String oldTransactionalPropertiesValue = null;
+ for (String key : oldTable.getParameters().keySet()) {
+ if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
+ oldTransactionalValue = oldTable.getParameters().get(key);
+ }
+ if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+ oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
+ }
+ }
+
if (transactionalValuePresent) {
//normalize prop name
parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
}
- if ("true".equalsIgnoreCase(transactionalValue)) {
+ if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
+ //only need to check conformance if alter table enabled aicd
if (!conformToAcid(newTable)) {
- throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
- " format (such as ORC)");
+ throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
}
if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -115,17 +127,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
hasValidTransactionalValue = true;
}
- Table oldTable = context.getOldTable();
- String oldTransactionalValue = null;
- String oldTransactionalPropertiesValue = null;
- for (String key : oldTable.getParameters().keySet()) {
- if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
- oldTransactionalValue = oldTable.getParameters().get(key);
- }
- if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
- oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
- }
- }
if (oldTransactionalValue == null ? transactionalValue == null
@@ -195,8 +196,7 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
if ("true".equalsIgnoreCase(transactionalValue)) {
if (!conformToAcid(newTable)) {
- throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
- " format (such as ORC)");
+ throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
}
if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -214,14 +214,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true'");
}
- // Check if table is bucketed and InputFormatClass/OutputFormatClass should implement
- // AcidInputFormat/AcidOutputFormat
+ /**
+ * Check that InputFormatClass/OutputFormatClass should implement
+ * AcidInputFormat/AcidOutputFormat
+ */
private boolean conformToAcid(Table table) throws MetaException {
StorageDescriptor sd = table.getSd();
- if (sd.getBucketColsSize() < 1) {
- return false;
- }
-
try {
Class inputFormatClass = Class.forName(sd.getInputFormat());
Class outputFormatClass = Class.forName(sd.getOutputFormat());
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9c9d4e7..b3ef916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -418,8 +418,8 @@ public enum ErrorMsg {
" does not support these operations."),
VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
"Values clause with table constructor not yet supported"),
- ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " +
- "an AcidOutputFormat or is not bucketed", true),
+ ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that is " +
+ "not transactional", true),
ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " +
"sorted, table {0}", true),
ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED(10299,
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 25ad1e9..bc265eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
@@ -285,6 +287,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
private transient int numFiles;
protected transient boolean multiFileSpray;
protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+ private transient boolean isBucketed = false;
private transient ObjectInspector[] partitionObjectInspectors;
protected transient HivePartitioner<HiveKey, Object> prtner;
@@ -345,6 +348,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
isNativeTable = !conf.getTableInfo().isNonNative();
isTemporary = conf.isTemporary();
multiFileSpray = conf.isMultiFileSpray();
+ this.isBucketed = hconf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
totalFiles = conf.getTotalFiles();
numFiles = conf.getNumFiles();
dpCtx = conf.getDynPartCtx();
@@ -791,9 +795,23 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
* Hive.copyFiles() will make one of them bucket_N_copy_M in the final location. The
* reset of acid (read path) doesn't know how to handle copy_N files except for 'original'
* files (HIVE-16177)*/
+ int writerId = -1;
+ if(!isBucketed) {
+ assert !multiFileSpray;
+ assert writerOffset == 0;
+ /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can
+ * be written to the same bucketN file.
+ * N in this case is writerId and there is no relationship
+ * between the file name and any property of the data in it. Inserts will be written
+ * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed
+ * contain writerId=N.
+ * Since taskId is unique (at least per statementId and thus
+ * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/
+ writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
+ }
fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
- jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset],
- rowInspector, reporter, 0);
+ jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf,
+ fpaths.outPaths[writerOffset], rowInspector, reporter, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
fpaths.outPaths[writerOffset]);
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 751fca8..69a9f9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -964,6 +964,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final boolean allowSyntheticFileIds;
private final boolean isDefaultFs;
+ /**
+ * @param dir - root of partition dir
+ */
public BISplitStrategy(Context context, FileSystem fs, Path dir,
List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, List<DeltaMetaData> deltas,
boolean[] covered, boolean allowSyntheticFileIds, boolean isDefaultFs) {
@@ -996,7 +999,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
- deltas, -1, logicalLen);
+ deltas, -1, logicalLen, dir);
splits.add(orcSplit);
}
}
@@ -1017,18 +1020,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* ACID split strategy is used when there is no base directory (when transactions are enabled).
*/
static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
- private Path dir;
+ Path dir;
private List<DeltaMetaData> deltas;
- private boolean[] covered;
- private int numBuckets;
private AcidOperationalProperties acidOperationalProperties;
-
+ /**
+ * @param dir root of partition dir
+ */
ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
AcidOperationalProperties acidOperationalProperties) {
this.dir = dir;
- this.numBuckets = numBuckets;
this.deltas = deltas;
- this.covered = covered;
this.acidOperationalProperties = acidOperationalProperties;
}
@@ -1234,6 +1235,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final UserGroupInformation ugi;
private final boolean allowSyntheticFileIds;
private SchemaEvolution evolution;
+ //this is the root of the partition in which the 'file' is located
+ private final Path rootDir;
public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException {
@@ -1250,6 +1253,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
this.isOriginal = splitInfo.isOriginal;
this.deltas = splitInfo.deltas;
this.hasBase = splitInfo.hasBase;
+ this.rootDir = splitInfo.dir;
this.projColsUncompressedSize = -1;
this.deltaSplits = splitInfo.getSplits();
this.allowSyntheticFileIds = allowSyntheticFileIds;
@@ -1361,7 +1365,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
fileKey = new SyntheticFileId(file);
}
return new OrcSplit(file.getPath(), fileKey, offset, length, hosts,
- orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen);
+ orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen, rootDir);
}
private static final class OffsetAndLength { // Java cruft; pair of long.
@@ -1641,7 +1645,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
pathFutures.add(ecs.submit(fileGenerator));
}
- boolean isTransactionalTableScan =
+ boolean isTransactionalTableScan =//this never seems to be set correctly
HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
TypeDescription readerSchema =
@@ -1932,16 +1936,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final OrcSplit split = (OrcSplit) inputSplit;
final Path path = split.getPath();
-
Path root;
if (split.hasBase()) {
if (split.isOriginal()) {
- root = path.getParent();
+ root = split.getRootDir();
} else {
root = path.getParent().getParent();
+ assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() +
+ " path.p.p=" + root;
}
- } else {//here path is a delta/ but above it's a partition/
- root = path;
+ } else {
+ throw new IllegalStateException("Split w/o base: " + path);
}
// Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
@@ -2037,21 +2042,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
};
}
- private static Path findOriginalBucket(FileSystem fs,
- Path directory,
- int bucket) throws IOException {
- for(FileStatus stat: fs.listStatus(directory)) {
- if(stat.getLen() <= 0) {
- continue;
- }
- AcidOutputFormat.Options bucketInfo =
- AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
- if(bucketInfo.getBucketId() == bucket) {
- return stat.getPath();
- }
- }
- throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory);
- }
static Reader.Options createOptionsForReader(Configuration conf) {
/**
@@ -2275,20 +2265,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
) throws IOException {
Reader reader = null;
boolean isOriginal = false;
- if (baseDirectory != null) {
- Path bucketFile;
+ if (baseDirectory != null) {//this is NULL for minor compaction
+ Path bucketFile = null;
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
} else {
+ /**we don't know which file to start reading -
+ * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
isOriginal = true;
- bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf),
- baseDirectory, bucket);
}
- reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+ if(bucketFile != null) {
+ reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+ }
}
OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
.isCompacting(true)
- .rootPath(baseDirectory);
+ .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 97c4e3d..cbbb4c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.BucketCodec;
@@ -29,7 +31,6 @@ import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.orc.OrcUtils;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.AcidStats;
import org.apache.orc.impl.OrcAcidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +69,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final RecordIdentifier maxKey;
// an extra value so that we can return it while reading ahead
private OrcStruct extraValue;
-
/**
* A RecordIdentifier extended with the current transaction id. This is the
* key of our merge sort with the originalTransaction, bucket, and rowId
@@ -294,9 +294,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* Running multiple Insert statements on the same partition (of non acid table) creates files
* like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc. So the OriginalReaderPair must treat all
* of these files as part of a single logical bucket file.
+ *
+ * Also, for unbucketed (non acid) tables, there are no guarantees where data files may be placed.
+ * For example, CTAS+Tez+Union creates subdirs 1/, 2/, etc for each leg of the Union. Thus the
+ * data file need not be an immediate child of partition dir. All files for a given writerId are
+ * treated as one logical unit to assign {@link RecordIdentifier}s to them consistently.
*
* For Compaction, where each split includes the whole bucket, this means reading over all the
* files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.
+ * For unbucketed tables, a Compaction split is all files written by a given writerId.
*
* For a read after the table is marked transactional but before it's rewritten into a base/
* by compaction, each of the original files may be split into many pieces. For each split we
@@ -305,7 +311,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* split of the original file and used to filter rows from all the deltas. The ROW__ID.rowid for
* the rows of the 'original' file of course, must be assigned from the beginning of logical
* bucket. The last split of the logical bucket, i.e. the split that has the end of last file,
- * should include all insert events from deltas.
+ * should include all insert events from deltas (last sentence is obsolete for Acid 2: HIVE-17320)
*/
private static abstract class OriginalReaderPair implements ReaderPair {
OrcStruct nextRecord;
@@ -407,18 +413,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
RecordIdentifier newMaxKey = maxKey;
recordReader = reader.rowsOptions(options);
/**
- * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't
- * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+ * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copy_N. etc We don't
+ * know N a priori so if this is true, then the current split is from 0000_0_copy_N file.
* It's needed to correctly set maxKey. In particular, set maxKey==null if this split
* is the tail of the last file for this logical bucket to include all deltas written after
- * non-acid to acid table conversion.
+ * non-acid to acid table conversion (todo: HIVE-17320).
+ * Also, see comments at {@link OriginalReaderPair} about unbucketed tables.
*/
- boolean isLastFileForThisBucket = false;
+ boolean isLastFileForThisBucket = true;
boolean haveSeenCurrentFile = false;
long rowIdOffsetTmp = 0;
- if (mergerOptions.getCopyIndex() > 0) {
+ {
//the split is from something other than the 1st file of the logical bucket - compute offset
-
AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
conf, validTxnList, false, true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
@@ -467,23 +473,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
}
}
- } else {
- rowIdOffset = 0;
- isLastFileForThisBucket = true;
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
- conf, validTxnList, false, true);
- int numFilesInBucket = 0;
- for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
- AcidOutputFormat.Options bucketOptions =
- AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
- if (bucketOptions.getBucketId() == bucketId) {
- numFilesInBucket++;
- if (numFilesInBucket > 1) {
- isLastFileForThisBucket = false;
- break;
- }
- }
- }
}
if (!isLastFileForThisBucket && maxKey == null) {
/*
@@ -651,6 +640,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
/**
* Find the key range for original bucket files.
+ * For unbucketed tables the insert event data is still written to bucket_N file except that
+ * N is just a writer ID - it still matches {@link RecordIdentifier#getBucketProperty()}. For
+ * 'original' files (ubucketed) the same applies. A file 000000_0 encodes a taskId/wirterId and
+ * at read time we synthesize {@link RecordIdentifier#getBucketProperty()} to match the file name
+ * and so the same bucketProperty is used here to create minKey/maxKey, i.e. these keys are valid
+ * to filter data from delete_delta files even for unbucketed tables.
* @param reader the reader
* @param bucket the bucket number we are reading
* @param options the options for reading with
@@ -740,7 +735,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
static Reader.Options createEventOptions(Reader.Options options) {
Reader.Options result = options.clone();
- //result.range(options.getOffset(), Long.MAX_VALUE);WTF?
result.include(options.getInclude());
// slide the column names down by 6 for the name array
@@ -755,11 +749,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return result;
}
+ /**
+ * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
+ * This makes the "context" explicit.
+ */
static class Options {
private int copyIndex = 0;
private boolean isCompacting = false;
private Path bucketPath;
private Path rootPath;
+ private boolean isMajorCompaction = false;
+ private boolean isDeleteReader = false;
Options copyIndex(int copyIndex) {
assert copyIndex >= 0;
this.copyIndex = copyIndex;
@@ -767,6 +767,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
Options isCompacting(boolean isCompacting) {
this.isCompacting = isCompacting;
+ assert !isDeleteReader;
return this;
}
Options bucketPath(Path bucketPath) {
@@ -777,6 +778,16 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.rootPath = rootPath;
return this;
}
+ Options isMajorCompaction(boolean isMajor) {
+ this.isMajorCompaction = isMajor;
+ assert !isDeleteReader;
+ return this;
+ }
+ Options isDeleteReader(boolean isDeleteReader) {
+ this.isDeleteReader = isDeleteReader;
+ assert !isCompacting;
+ return this;
+ }
/**
* 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
*/
@@ -788,7 +799,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
/**
* Full path to the data file
- * @return
*/
Path getBucketPath() {
return bucketPath;
@@ -797,6 +807,22 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* Partition folder (Table folder if not partitioned)
*/
Path getRootPath() { return rootPath; }
+ /**
+ * @return true if major compaction, false if minor
+ */
+ boolean isMajorCompaction() {
+ return isMajorCompaction && isCompacting;
+ }
+ boolean isMinorCompaction() {
+ return !isMajorCompaction && isCompacting;
+ }
+ /**
+ * true if this is only processing delete deltas to load in-memory table for
+ * vectorized reader
+ */
+ boolean isDeleteReader() {
+ return isDeleteReader;
+ }
}
/**
* Create a reader that merge sorts the ACID events together.
@@ -820,6 +846,39 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.offset = options.getOffset();
this.length = options.getLength();
this.validTxnList = validTxnList;
+ /**
+ * @since Hive 3.0
+ * With split update (HIVE-14035) we have base/, delta/ and delete_delta/ - the latter only
+ * has Delete events and the others only have Insert events. Thus {@link #baseReader} is
+ * a split of a file in base/ or delta/.
+ *
+ * For Compaction, each split (for now) is a logical bucket, i.e. all files from base/ + delta(s)/
+ * for a given bucket ID and delete_delta(s)/
+ *
+ * For bucketed tables, the data files are named bucket_N and all rows in this file are such
+ * that {@link org.apache.hadoop.hive.ql.io.BucketCodec#decodeWriterId(int)} of
+ * {@link RecordIdentifier#getBucketProperty()} is N. This is currently true for all types of
+ * files but may not be true for for delete_delta/ files in the future.
+ *
+ * For un-bucketed tables, the system is designed so that it works when there is no relationship
+ * between delete_delta file name (bucket_N) and the value of {@link RecordIdentifier#getBucketProperty()}.
+ * (Later we this maybe optimized to take advantage of situations where it is known that
+ * bucket_N matches bucketProperty().) This implies that for a given {@link baseReader} all
+ * files in delete_delta/ have to be opened ({@link ReaderPair} created). Insert events are
+ * still written such that N in file name (writerId) matches what's in bucketProperty().
+ *
+ * Compactor for un-bucketed tables works exactly the same as for bucketed ones though it
+ * should be optimized (see HIVE-17206). In particular, each split is a set of files
+ * created by a writer with the same writerId, i.e. all bucket_N files across base/ &
+ * deleta/ for the same N. Unlike bucketed tables, there is no relationship between
+ * any values in user columns to file name.
+ * The maximum N is determined by the number of writers the system chose for the the "largest"
+ * write into a given partition.
+ *
+ * In both cases, Compactor should be changed so that Minor compaction is run very often and
+ * only compacts delete_delta/. Major compaction can do what it does now.
+ */
+ boolean isBucketed = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
TypeDescription typeDescr =
OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
@@ -829,16 +888,26 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
// modify the options to reflect the event instead of the base row
Reader.Options eventOptions = createEventOptions(options);
- if (reader == null) {
+ if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
+ mergerOptions.isDeleteReader()) {
+ //for minor compaction, there is no progress report and we don't filter deltas
baseReader = null;
minKey = maxKey = null;
+ assert reader == null : "unexpected input reader during minor compaction: " +
+ mergerOptions.getRootPath();
} else {
KeyInterval keyInterval;
- // find the min/max based on the offset and length (and more for 'original')
- if (isOriginal) {
- keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+ if (mergerOptions.isCompacting()) {
+ assert mergerOptions.isMajorCompaction();
+ //compaction doesn't filter deltas but *may* have a reader for 'base'
+ keyInterval = new KeyInterval(null, null);
} else {
- keyInterval = discoverKeyBounds(reader, options);
+ // find the min/max based on the offset and length (and more for 'original')
+ if (isOriginal) {
+ keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+ } else {
+ keyInterval = discoverKeyBounds(reader, options);
+ }
}
LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
// use the min/max instead of the byte range
@@ -849,8 +918,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if(mergerOptions.isCompacting()) {
pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
conf, validTxnList);
- }
- else {
+ } else {
pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
}
@@ -868,35 +936,31 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
baseReader = pair.getRecordReader();
}
- // we always want to read all of the deltas
- eventOptions.range(0, Long.MAX_VALUE);
if (deltaDirectory != null) {
+ /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+ * user columns
+ * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+ Reader.Options deltaEventOptions = eventOptions.clone()
+ .searchArgument(null, null).range(0, Long.MAX_VALUE);
for(Path delta: deltaDirectory) {
if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
//all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
}
ReaderKey key = new ReaderKey();
- Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
- FileSystem fs = deltaFile.getFileSystem(conf);
- long length = OrcAcidUtils.getLastFlushLength(fs, deltaFile);
- if (length != -1 && fs.exists(deltaFile)) {
- Reader deltaReader = OrcFile.createReader(deltaFile,
- OrcFile.readerOptions(conf).maxLength(length));
- Reader.Options deltaEventOptions = null;
- if(eventOptions.getSearchArgument() != null) {
- // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
- // it can produce wrong results (if the latest valid version of the record is filtered out by
- // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
- // unless the delta only has insert events
- AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader);
- if(acidStats.deletes > 0 || acidStats.updates > 0) {
- deltaEventOptions = eventOptions.clone().searchArgument(null, null);
- }
+ for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
+ FileSystem fs = deltaFile.getFileSystem(conf);
+ if(!fs.exists(deltaFile)) {
+ continue;
}
+ /* side files are only created by streaming ingest. If this is a compaction, we may
+ * have an insert delta/ here with side files there because the original writer died.*/
+ long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
+ assert length >= 0;
+ Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
- deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+ deltaEventOptions, deltaDir.getStatementId());
if (deltaPair.nextRecord() != null) {
readers.put(key, deltaPair);
}
@@ -921,6 +985,59 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
+ /**
+ * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
+ * For unbucketed tables {@code bucket} can be thought of as a write tranche.
+ */
+ static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf,
+ Options mergerOptions, boolean isBucketed) throws IOException {
+ if(isBucketed) {
+ /**
+ * for bucketed tables (for now) we always trust that the N in bucketN file name means that
+ * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N. This
+ * means that a delete event in bucketN can only modify an insert in another bucketN file for
+ * the same N. (Down the road we may trust it only in certain delta dirs)
+ *
+ * Compactor takes all types of deltas for a given bucket. For regular read, any file that
+ * contains (only) insert events is treated as base and only
+ * delete_delta/ are treated as deltas.
+ */
+ assert (!mergerOptions.isCompacting &&
+ deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+ ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory;
+ Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+ return new Path[]{deltaFile};
+ }
+ /**
+ * For unbucketed tables insert events are also stored in bucketN files but here N is
+ * the writer ID. We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()}
+ * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN.
+ * Thus we always have to take all files in a delete_delta.
+ * For regular read, any file that has (only) insert events is treated as base so
+ * {@link deltaDirectory} can only be delete_delta and so we take all files in it.
+ * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for
+ * a given N.
+ */
+ if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+ //it's not wrong to take all delete events for bucketed tables but it's more efficient
+ //to only take those that belong to the 'bucket' assuming we trust the file name
+ //un-bucketed table - get all files
+ FileSystem fs = deltaDirectory.getFileSystem(conf);
+ FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter);
+ Path[] deltaFiles = new Path[dataFiles.length];
+ int i = 0;
+ for (FileStatus stat : dataFiles) {
+ deltaFiles[i++] = stat.getPath();
+ }//todo: need a test where we actually have more than 1 file
+ return deltaFiles;
+ }
+ //if here it must be delta_x_y - insert events only, so we must be compacting
+ assert mergerOptions.isCompacting() : "Expected to be called as part of compaction";
+ Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+ return new Path[] {deltaFile};
+
+ }
+
@VisibleForTesting
RecordIdentifier getMinKey() {
return minKey;
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 429960b..1e19a91 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -243,7 +243,8 @@ public class OrcRecordUpdater implements RecordUpdater {
}
if (options.getMinimumTransactionId() != options.getMaximumTransactionId()
&& !options.isWritingBase()){
- flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
+ //throw if file already exists as that should never happen
+ flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8,
options.getReporter());
flushLengths.writeLong(0);
OrcInputFormat.SHIMS.hflush(flushLengths);
@@ -349,6 +350,12 @@ public class OrcRecordUpdater implements RecordUpdater {
return newInspector;
}
}
+
+ /**
+ * The INSERT event always uses {@link #bucket} that this {@link RecordUpdater} was created with
+ * thus even for unbucketed tables, the N in bucket_N file name matches writerId/bucketId even for
+ * late split
+ */
private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
throws IOException {
this.operation.set(operation);
@@ -394,6 +401,11 @@ public class OrcRecordUpdater implements RecordUpdater {
Integer currentBucket = null;
if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+ /**
+ * make sure bucketProperty in the delete event is from the {@link row} rather than whatever
+ * {@link this#bucket} is. For bucketed tables, the 2 must agree on bucketId encoded in it
+ * not for necessarily the whole value. For unbucketed tables there is no relationship.
+ */
currentBucket = setBucket(bucketInspector.get(
recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
// Initialize a deleteEventWriter if not yet done. (Lazy initialization)