You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/10/16 18:23:41 UTC

hive git commit: HIVE-17231: ColumnizedDeleteEventRegistry.DeleteReaderValue optimization (Eugene Koifman, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master 0f2f999bb -> d7be4b9f2


HIVE-17231: ColumnizedDeleteEventRegistry.DeleteReaderValue optimization (Eugene Koifman, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7be4b9f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7be4b9f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7be4b9f

Branch: refs/heads/master
Commit: d7be4b9f26345439c472969461d3d2c81f7e5057
Parents: 0f2f999
Author: Eugene Koifman <ek...@apache.org>
Authored: Tue Oct 16 11:23:11 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Tue Oct 16 11:23:11 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  4 ++
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 55 +++-----------------
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 13 +----
 3 files changed, 12 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d7be4b9f/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 8c7a78b..79e41d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -388,6 +388,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
      *
      * A new FSP is created for each partition, so this only requires the bucket numbering and that
      * is mapped in directly as an index.
+     *
+     * This relies on ReduceSinkOperator to shuffle update/delete rows by
+     * UDFToInteger(RecordIdentifier), i.e. by writerId in ROW__ID.
+     * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#getPartitionColsFromBucketColsForUpdateDelete(Operator, boolean)}
      */
     public int createDynamicBucket(int bucketNum) {
       // this assumes all paths are bucket names (which means no lookup is needed)

http://git-wip-us.apache.org/repos/asf/hive/blob/d7be4b9f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 6be0c74..8cabf96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -1122,7 +1122,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           }
           continue;
         }
-        for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
+        for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions)) {
           FileSystem fs = deltaFile.getFileSystem(conf);
           if(!fs.exists(deltaFile)) {
             /**
@@ -1262,53 +1262,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
    * For unbucketed tables {@code bucket} can be thought of as a write tranche.
    */
-  static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf,
-                              Options mergerOptions, boolean isBucketed) throws IOException {
-    if(isBucketed) {
-      /**
-       * for bucketed tables (for now) we always trust that the N in bucketN file name means that
-       * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N.  This
-       * means that a delete event in bucketN can only modify an insert in another bucketN file for
-       * the same N. (Down the road we may trust it only in certain delta dirs)
-       *
-       * Compactor takes all types of deltas for a given bucket.  For regular read, any file that
-       * contains (only) insert events is treated as base and only
-       * delete_delta/ are treated as deltas.
-       */
-        assert (!mergerOptions.isCompacting &&
-          deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
-        ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory;
-      Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
-      return new Path[]{deltaFile};
-    }
-    /**
-     * For unbucketed tables insert events are also stored in bucketN files but here N is
-     * the writer ID.  We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()}
-     * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN.
-     * Thus we always have to take all files in a delete_delta.
-     * For regular read, any file that has (only) insert events is treated as base so
-     * {@link deltaDirectory} can only be delete_delta and so we take all files in it.
-     * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for
-     * a given N.
-     */
-    if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
-      //it's not wrong to take all delete events for bucketed tables but it's more efficient
-      //to only take those that belong to the 'bucket' assuming we trust the file name
-      //un-bucketed table - get all files
-      FileSystem fs = deltaDirectory.getFileSystem(conf);
-      FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter);
-      Path[] deltaFiles = new Path[dataFiles.length];
-      int i = 0;
-      for (FileStatus stat : dataFiles) {
-        deltaFiles[i++] = stat.getPath();
-      }//todo: need a test where we actually have more than 1 file
-      return deltaFiles;
-    }
-    //if here it must be delta_x_y - insert events only, so we must be compacting
-    assert mergerOptions.isCompacting() : "Expected to be called as part of compaction";
-    Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
-    return new Path[] {deltaFile};
-
+  static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Options mergerOptions) {
+    assert (!mergerOptions.isCompacting &&
+        deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+    ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory +
+        "(isCompacting=" + mergerOptions.isCompacting() + ")";
+    return new Path[] {AcidUtils.createBucketFile(deltaDirectory, bucket)};
   }
   
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hive/blob/d7be4b9f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 66280b2..2f809de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -1420,19 +1420,8 @@ public class VectorizedOrcAcidRowBatchReader
       /**
        * Whenever we are reading a batch, we must ensure that all the records in the batch
        * have the same bucket id as the bucket id of the split. If not, throw exception.
-       * NOTE: this assertion might not hold, once virtual bucketing is in place. However,
-       * it should be simple to fix that case. Just replace check for bucket equality with
-       * a check for valid bucket mapping. Until virtual bucketing is added, it means
-       * either the split computation got messed up or we found some corrupted records.
        */
       private void checkBucketId(int bucketPropertyFromRecord) throws IOException {
-        if(!isBucketedTable) {
-          /**
-           * in this case a file inside a delete_delta_x_y/bucketN may contain any value for
-           * bucketId in {@link RecordIdentifier#getBucketProperty()}
-           */
-          return;
-        }
         int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord)
           .decodeWriterId(bucketPropertyFromRecord);
         if(bucketIdFromRecord != bucketForSplit) {
@@ -1534,7 +1523,7 @@ public class VectorizedOrcAcidRowBatchReader
           for (Path deleteDeltaDir : deleteDeltaDirs) {
             FileSystem fs = deleteDeltaDir.getFileSystem(conf);
             Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
-                conf, new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable);
+                new OrcRawRecordMerger.Options().isCompacting(false));
             for (Path deleteDeltaFile : deleteDeltaFiles) {
               // NOTE: Calling last flush length below is more for future-proofing when we have
               // streaming deletes. But currently we don't support streaming deletes, and this can