You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2018/06/22 22:35:40 UTC

hive git commit: HIVE-19890: ACID: Inherit bucket-id from original ROW_ID for delete deltas (Gopal V, reviewed by Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master 6d532e7c4 -> 23d2b80b0


HIVE-19890: ACID: Inherit bucket-id from original ROW_ID for delete deltas (Gopal V, reviewed by Eugene Koifman)

Signed-off-by: Gopal V <go...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/23d2b80b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/23d2b80b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/23d2b80b

Branch: refs/heads/master
Commit: 23d2b80b0ae246d00613b06ce5ed554efb49d1d4
Parents: 6d532e7
Author: Gopal V <go...@apache.org>
Authored: Fri Jun 22 15:21:10 2018 -0700
Committer: Gopal V <go...@apache.org>
Committed: Fri Jun 22 15:24:00 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 58 ++++++++++++--------
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  6 ++
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 34 ++++++++----
 .../clientpositive/llap/acid_no_buckets.q.out   |  6 +-
 .../materialized_view_create_rewrite_4.q.out    |  2 +
 5 files changed, 73 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index c2319bb..21f8268 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -370,6 +371,38 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     public Collection<String> getStoredStats() {
       return stat.getStoredStats();
     }
+
+    /**
+     * This method is intended for use with ACID unbucketed tables, where the DELETE ops behave as
+     * though they are bucketed, but without an explicit pre-specified bucket count. The bucketNum
+     * is read out of the middle value of the ROW__ID variable and this is written out from a single
+     * FileSink, in ways similar to the multi file spray, but without knowing the total number of
+     * buckets ahead of time.
+     *
+     * ROW__ID (1,2[0],3) => bucket_00002
+     * ROW__ID (1,3[0],4) => bucket_00003 etc
+     *
+     * A new FSP is created for each partition, so this only requires the bucket numbering and that
+     * is mapped in directly as an index.
+     */
+    public int createDynamicBucket(int bucketNum) {
+      // this assumes all paths are bucket names (which means no lookup is needed)
+      int writerOffset = bucketNum;
+      if (updaters.length <= writerOffset) {
+        this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
+        this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
+        this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
+      }
+
+      if (this.finalPaths[writerOffset] == null) {
+        // uninitialized bucket
+        String bucketName =
+            Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
+        this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() : parent, bucketName);
+        this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), bucketName);
+      }
+      return writerOffset;
+    }
   } // class FSPaths
 
   private static final long serialVersionUID = 1L;
@@ -976,31 +1009,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
               " from data but no mapping in 'bucketMap'." + extraMsg);
           }
           writerOffset = bucketMap.get(bucketNum);
+        } else if (!isBucketed) {
+          writerOffset = fpaths.createDynamicBucket(bucketNum);
         }
         if (fpaths.updaters[writerOffset] == null) {
-          /*data for delete commands always have ROW__ID which implies that the bucket ID
-          * for each row is known.  RecordUpdater creates bucket_N file based on 'bucketNum' thus
-          * delete events always land in the proper bucket_N file.  This could even handle
-          * cases where multiple writers are writing bucket_N file for the same N in which case
-          * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location.  The
-          * reset of acid (read path) doesn't know how to handle copy_N files except for 'original'
-          * files (HIVE-16177)*/
-          int writerId = -1;
-          if(!isBucketed) {
-            assert !multiFileSpray;
-            assert writerOffset == 0;
-            /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can
-            * be written to the same bucketN file.
-            * N in this case is writerId and there is no relationship
-            * between the file name and any property of the data in it.  Inserts will be written
-            * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed
-            * contain writerId=N.
-            * Since taskId is unique (at least per statementId and thus
-            * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/
-            writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
-          }
           fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
-            jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf,
+            jc, conf.getTableInfo(), bucketNum, conf,
             fpaths.outPaths[writerOffset], rowInspector, reporter, 0);
           if (LOG.isDebugEnabled()) {
             LOG.debug("Created updater for bucket number " + bucketNum + " using file " +

http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 867ffe4..779ca7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6865,6 +6865,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
       }
     }
+    else {
+      if(updating(dest) || deleting(dest)) {
+        partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
+        enforceBucketing = true;
+      }
+    }
 
     if ((dest_tab.getSortCols() != null) &&
         (dest_tab.getSortCols().size() > 0)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index 7ab76b3..cf68d32 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -96,6 +96,13 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
     Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
 
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
+    rs = runStatementOnDriver("explain  update nobuckets set c3 = 17 where c3 in(0,1)");
+    LOG.warn("Query Plan: ");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+
     runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)");
     rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID");
     LOG.warn("after update");
@@ -106,18 +113,21 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001"));
-    //so update has 1 writer which creates bucket0 where both new rows land
+    //so update has 1 writer, but which creates buckets where the new rows land
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000"));
+    // update for "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t"
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00001"));
 
     Set<String> expectedFiles = new HashSet<>();
-    //both delete events land in a single bucket0.  Each has a different ROW__ID.bucketId value (even writerId in it is different)
+    //both delete events land in corresponding buckets to the original row-ids
     expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00001");
     expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00000");
     expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00001");
     expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00001");
     //check that we get the right files on disk
     assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
     //todo: it would be nice to check the contents of the files... could use orc.FileDump - it has
@@ -136,6 +146,7 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
 │   └── bucket_00001
 ├── delete_delta_0000002_0000002_0000
 │   └── bucket_00000
+|   └── bucket_00001
 ├── delta_0000001_0000001_0000
 │   ├── bucket_00000
 │   └── bucket_00001
@@ -146,16 +157,18 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/base_0000002/bucket_00000"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/base_0000002/bucket_00000"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000002/bucket_00000"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000002/bucket_00001"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t"));
     Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/base_0000002/bucket_00001"));
 
     expectedFiles.clear();
     expectedFiles.add("delete_delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add("delete_delta_0000002_0000002_0000/bucket_00001");
     expectedFiles.add("uckets/delta_0000001_0000001_0000/bucket_00000");
     expectedFiles.add("uckets/delta_0000001_0000001_0000/bucket_00001");
     expectedFiles.add("uckets/delta_0000002_0000002_0000/bucket_00000");
+    expectedFiles.add("uckets/delta_0000002_0000002_0000/bucket_00001");
     expectedFiles.add("/warehouse/nobuckets/base_0000002/bucket_00000");
     expectedFiles.add("/warehouse/nobuckets/base_0000002/bucket_00001");
     assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets");
@@ -393,7 +406,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"},
-        {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00000"},
+        // update for "{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t60\t80"
+        {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00001"}, 
     };
     rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME");
     checkExpected(rs, expected3,"after converting to acid (no compaction with updates)");
@@ -421,8 +435,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
             "warehouse/t/base_10000002/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60",
             "warehouse/t/base_10000002/bucket_00000"},
-        {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88",
-            "warehouse/t/base_10000002/bucket_00000"},
+        {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88",
+            "warehouse/t/base_10000002/bucket_00001"},
     };
     checkExpected(rs, expected4,"after major compact");
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
index 80bbba4..f9a17a5 100644
--- a/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
+++ b/ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out
@@ -130,6 +130,7 @@ STAGE PLANS:
                       Reduce Output Operator
                         key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
                         sort order: +
+                        Map-reduce partition columns: UDFToInteger(_col0) (type: int)
                         Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL
                         value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
             Execution mode: llap
@@ -312,6 +313,7 @@ STAGE PLANS:
                       Reduce Output Operator
                         key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
                         sort order: +
+                        Map-reduce partition columns: UDFToInteger(_col0) (type: int)
                         Statistics: Num rows: 101 Data size: 44844 Basic stats: COMPLETE Column stats: PARTIAL
                         value expressions: _col1 (type: string), _col2 (type: string)
             Execution mode: llap
@@ -1138,6 +1140,7 @@ STAGE PLANS:
                             keyColumnNums: [4]
                             native: true
                             nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+                            partitionColumnNums: [5]
                             valueColumnNums: [0, 6, 2]
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
@@ -1336,6 +1339,7 @@ STAGE PLANS:
                             keyColumnNums: [4]
                             native: true
                             nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+                            partitionColumnNums: [5]
                             valueColumnNums: [2, 3]
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
@@ -1355,7 +1359,7 @@ STAGE PLANS:
                     neededVirtualColumns: [ROWID]
                     partitionColumnCount: 2
                     partitionColumns: ds:string, hr:string
-                    scratchColumnTypeNames: []
+                    scratchColumnTypeNames: [bigint]
         Reducer 2 
             Execution mode: vectorized, llap
             Reduce Vectorization:

http://git-wip-us.apache.org/repos/asf/hive/blob/23d2b80b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
index 2b26eba..8aa6c72 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
@@ -788,6 +788,7 @@ STAGE PLANS:
                     Reduce Output Operator
                       key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
                       sort order: +
+                      Map-reduce partition columns: UDFToInteger(_col0) (type: int)
                       Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col1 (type: int), _col2 (type: decimal(10,2)), _col3 (type: bigint)
         Reducer 3 
@@ -1598,6 +1599,7 @@ STAGE PLANS:
                     Reduce Output Operator
                       key expressions: _col0 (type: struct<writeid:bigint,bucketid:int,rowid:bigint>)
                       sort order: +
+                      Map-reduce partition columns: UDFToInteger(_col0) (type: int)
                       Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col1 (type: int), _col2 (type: decimal(10,2)), _col3 (type: bigint)
         Reducer 3