You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2018/06/22 22:35:40 UTC
hive git commit: HIVE-19890: ACID: Inherit bucket-id from original
ROW_ID for delete deltas (Gopal V, reviewed by Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master 6d532e7c4 -> 23d2b80b0
HIVE-19890: ACID: Inherit bucket-id from original ROW_ID for delete deltas (Gopal V, reviewed by Eugene Koifman)
Signed-off-by: Gopal V <go...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/23d2b80b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/23d2b80b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/23d2b80b
Branch: refs/heads/master
Commit: 23d2b80b0ae246d00613b06ce5ed554efb49d1d4
Parents: 6d532e7
Author: Gopal V <go...@apache.org>
Authored: Fri Jun 22 15:21:10 2018 -0700
Committer: Gopal V <go...@apache.org>
Committed: Fri Jun 22 15:24:00 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/FileSinkOperator.java | 58 ++++++++++++--------
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 6 ++
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 34 ++++++++----
.../clientpositive/llap/acid_no_buckets.q.out | 6 +-
.../materialized_view_create_rewrite_4.q.out | 2 +
5 files changed, 73 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index c2319bb..21f8268 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -370,6 +371,38 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
public Collection<String> getStoredStats() {
return stat.getStoredStats();
}
+
+ /**
+ * This method is intended for use with ACID unbucketed tables, where the DELETE ops behave as
+ * though they are bucketed, but without an explicit pre-specified bucket count. The bucketNum
+ * is read out of the middle value of the ROW__ID variable and this is written out from a single
+ * FileSink, in ways similar to the multi file spray, but without knowing the total number of
+ * buckets ahead of time.
+ *
+ * ROW__ID (1,2[0],3) => bucket_00002
+ * ROW__ID (1,3[0],4) => bucket_00003 etc
+ *
+ * A new FSP is created for each partition, so this only requires the bucket numbering and that
+ * is mapped in directly as an index.
+ */
+ public int createDynamicBucket(int bucketNum) {
+ // this assumes all paths are bucket names (which means no lookup is needed)
+ int writerOffset = bucketNum;
+ if (updaters.length <= writerOffset) {
+ this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
+ this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
+ this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
+ }
+
+ if (this.finalPaths[writerOffset] == null) {
+ // uninitialized bucket
+ String bucketName =
+ Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
+ this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() : parent, bucketName);
+ this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), bucketName);
+ }
+ return writerOffset;
+ }
} // class FSPaths
private static final long serialVersionUID = 1L;
@@ -976,31 +1009,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
" from data but no mapping in 'bucketMap'." + extraMsg);
}
writerOffset = bucketMap.get(bucketNum);
+ } else if (!isBucketed) {
+ writerOffset = fpaths.createDynamicBucket(bucketNum);
}
if (fpaths.updaters[writerOffset] == null) {
- /*data for delete commands always have ROW__ID which implies that the bucket ID
- * for each row is known. RecordUpdater creates bucket_N file based on 'bucketNum' thus
- * delete events always land in the proper bucket_N file. This could even handle
- * cases where multiple writers are writing bucket_N file for the same N in which case
- * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location. The
- * reset of acid (read path) doesn't know how to handle copy_N files except for 'original'
- * files (HIVE-16177)*/
- int writerId = -1;
- if(!isBucketed) {
- assert !multiFileSpray;
- assert writerOffset == 0;
- /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can
- * be written to the same bucketN file.
- * N in this case is writerId and there is no relationship
- * between the file name and any property of the data in it. Inserts will be written
- * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed
- * contain writerId=N.
- * Since taskId is unique (at least per statementId and thus
- * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/
- writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
- }
fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
- jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf,
+ jc, conf.getTableInfo(), bucketNum, conf,
fpaths.outPaths[writerOffset], rowInspector, reporter, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 867ffe4..779ca7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6865,6 +6865,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
}
}
+ else {
+ if(updating(dest) || deleting(dest)) {
+ partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
+ enforceBucketing = true;
+ }
+ }
if ((dest_tab.getSortCols() != null) &&
(dest_tab.getSortCols().size() > 0)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index 7ab76b3..cf68d32 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -96,6 +96,13 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
+ rs = runStatementOnDriver("explain update nobuckets set c3 = 17 where c3 in(0,1)");
+ LOG.warn("Query Plan: ");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+
runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)");
rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
LOG.warn("after update");
@@ -106,18 +113,21 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
- //so update has 1 writer which creates bucket0 where both new rows land
+ //so update has 1 writer, but which creates buckets where the new rows land
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000"));
+ // update for "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00001"));
Set<String> expectedFiles = new HashSet<>();
- //both delete events land in a single bucket0. Each has a different ROW__ID.bucketId value (even writerId in it is different)
+ //both delete events land in corresponding buckets to the original row-ids
expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00001");
expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00000");
expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00001");
expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00001");
//check that we get the right files on disk
assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
//todo: it would be nice to check the contents of the files... could use orc.FileDump - it has
@@ -136,6 +146,7 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
│ └── bucket_00001
├── delete_delta_0000002_0000002_0000
│ └── bucket_00000
+| └── bucket_00001
├── delta_0000001_0000001_0000
│ ├── bucket_00000
│ └── bucket_00001
@@ -146,16 +157,18 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/base_0000002/bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/base_0000002/bucket_00000"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
- Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000002/bucket_00000"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000002/bucket_00001"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/base_0000002/bucket_00001"));
expectedFiles.clear();
expectedFiles.add("delete_delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add("delete_delta_0000002_0000002_0000/bucket_00001");
expectedFiles.add("uckets/delta_0000001_0000001_0000/bucket_00000");
expectedFiles.add("uckets/delta_0000001_0000001_0000/bucket_00001");
expectedFiles.add("uckets/delta_0000002_0000002_0000/bucket_00000");
+ expectedFiles.add("uckets/delta_0000002_0000002_0000/bucket_00001");
expectedFiles.add("/warehouse/nobuckets/base_0000002/bucket_00000");
expectedFiles.add("/warehouse/nobuckets/base_0000002/bucket_00001");
assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
@@ -393,7 +406,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"},
- {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00000"},
+ // update for "{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t60\t80"
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00001"},
};
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME");
checkExpected(rs, expected3,"after converting to acid (no compaction with updates)");
@@ -421,8 +435,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
"warehouse/t/base_10000002/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60",
"warehouse/t/base_10000002/bucket_00000"},
- {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88",
- "warehouse/t/base_10000002/bucket_00000"},
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88",
+ "warehouse/t/base_10000002/bucket_00001"},
};
checkExpected(rs, expected4,"after major compact");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
index 80bbba4..f9a17a5 100644
--- a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
+++ b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
@@ -130,6 +130,7 @@ STAGE PLANS:
Reduce Output Operator
key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
sort order: +
+ Map-reduce partition columns: UDFToInteger(_col0) (type: int)
Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
Execution mode: llap
@@ -312,6 +313,7 @@ STAGE PLANS:
Reduce Output Operator
key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
sort order: +
+ Map-reduce partition columns: UDFToInteger(_col0) (type: int)
Statistics: Num rows: 101 Data size: 44844 Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col1 (type: string), _col2 (type: string)
Execution mode: llap
@@ -1138,6 +1140,7 @@ STAGE PLANS:
keyColumnNums: [4]
native: true
nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ partitionColumnNums: [5]
valueColumnNums: [0, 6, 2]
Execution mode: vectorized, llap
LLAP IO: may be used (ACID table)
@@ -1336,6 +1339,7 @@ STAGE PLANS:
keyColumnNums: [4]
native: true
nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ partitionColumnNums: [5]
valueColumnNums: [2, 3]
Execution mode: vectorized, llap
LLAP IO: may be used (ACID table)
@@ -1355,7 +1359,7 @@ STAGE PLANS:
neededVirtualColumns: [ROWID]
partitionColumnCount: 2
partitionColumns: ds:string, hr:string
- scratchColumnTypeNames: []
+ scratchColumnTypeNames: [bigint]
Reducer 2
Execution mode: vectorized, llap
Reduce Vectorization:
http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
index 2b26eba..8aa6c72 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
@@ -788,6 +788,7 @@ STAGE PLANS:
Reduce Output Operator
key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
sort order: +
+ Map-reduce partition columns: UDFToInteger(_col0) (type: int)
Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: int), _col2 (type: decimal(10,2)), _col3 (type: bigint)
Reducer 3
@@ -1598,6 +1599,7 @@ STAGE PLANS:
Reduce Output Operator
key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
sort order: +
+ Map-reduce partition columns: UDFToInteger(_col0) (type: int)
Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: int), _col2 (type: decimal(10,2)), _col3 (type: bigint)
Reducer 3