You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/09/02 19:04:20 UTC

[hive] branch master updated: HIVE-26510: Minor compaction creates empty delta files when no prior delta file exists (#3568). (Sourabh Badhya, reviewed by Denys Kuzmenko and Ayush Saxena)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ada437afc6 HIVE-26510: Minor compaction creates empty delta files when no prior delta file exists (#3568). (Sourabh Badhya, reviewed by Denys Kuzmenko and Ayush Saxena)
6ada437afc6 is described below

commit 6ada437afc641cd44edf4ea0ccdd905a37385005
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Sat Sep 3 00:34:11 2022 +0530

    HIVE-26510: Minor compaction creates empty delta files when no prior delta file exists (#3568). (Sourabh Badhya, reviewed by Denys Kuzmenko and Ayush Saxena)
---
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  | 54 +++++++++++-----------
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    | 27 +++++++++++
 2 files changed, 54 insertions(+), 27 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index d83586d7ffa..5f0650f403f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -801,7 +801,6 @@ public class CompactorMR {
                   writeIdList, split.getBaseDir(), split.getDeltaDirs(), split.getDeltasToAttemptId());
       RecordIdentifier identifier = reader.createKey();
       V value = reader.createValue();
-      getWriter(reporter, reader.getObjectInspector(), split.getBucket());
 
       AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(jobConf);
 
@@ -811,13 +810,12 @@ public class CompactorMR {
           continue;
         }
         if (sawDeleteRecord && acidOperationalProperties.isSplitUpdate()) {
-          if (deleteEventWriter == null) {
-            getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket());
-          }
-          deleteEventWriter.write(value);
+          getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket())
+                  .write(value);
           reporter.progress();
         } else {
-          writer.write(value);
+          getWriter(reporter, reader.getObjectInspector(), split.getBucket())
+                  .write(value);
           reporter.progress();
         }
       }
@@ -848,7 +846,7 @@ public class CompactorMR {
       //this is id of the current (compactor) txn
       return validTxnList.getHighWatermark();
     }
-    private void getWriter(Reporter reporter, ObjectInspector inspector,
+    private RecordWriter getWriter(Reporter reporter, ObjectInspector inspector,
                            int bucket) throws IOException {
       if (writer == null) {
         AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
@@ -873,6 +871,7 @@ public class CompactorMR {
         cleanupTmpLocationOnTaskRetry(options, rootDir);
         writer = aof.getRawRecordWriter(rootDir, options);
       }
+      return writer;
     }
 
     @VisibleForTesting
@@ -887,29 +886,30 @@ public class CompactorMR {
       }
     }
 
-    private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector,
+    private RecordWriter getDeleteEventWriter(Reporter reporter, ObjectInspector inspector,
         int bucket) throws IOException {
+      if (deleteEventWriter == null) {
+        AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
+        options.inspector(inspector).writingBase(false)
+                .writingDeleteDelta(true)   // this is the option which will make it a delete writer
+                .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+                .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()).reporter(reporter)
+                .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+                .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket)
+                .statementId(-1)//setting statementId == -1 makes compacted delta files use
+                // delta_xxxx_yyyy format
+                .visibilityTxnId(getCompactorTxnId(jobConf));
 
-      AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
-      options.inspector(inspector).writingBase(false)
-          .writingDeleteDelta(true)   // this is the option which will make it a delete writer
-          .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
-          .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()).reporter(reporter)
-          .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
-          .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket)
-          .statementId(-1)//setting statementId == -1 makes compacted delta files use
-          // delta_xxxx_yyyy format
-          .visibilityTxnId(getCompactorTxnId(jobConf));
-
-      // Instantiate the underlying output format
-      @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
-          AcidOutputFormat<WritableComparable, V> aof =
-          instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
-
-      Path rootDir = new Path(jobConf.get(TMP_LOCATION));
-      cleanupTmpLocationOnTaskRetry(options, rootDir);
-      deleteEventWriter = aof.getRawRecordWriter(rootDir, options);
+        // Instantiate the underlying output format
+        @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
+        AcidOutputFormat<WritableComparable, V> aof =
+                instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
 
+        Path rootDir = new Path(jobConf.get(TMP_LOCATION));
+        cleanupTmpLocationOnTaskRetry(options, rootDir);
+        deleteEventWriter = aof.getRawRecordWriter(rootDir, options);
+      }
+      return deleteEventWriter;
     }
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2366dbe0309..0e1edcd9d86 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -3191,6 +3191,33 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     }
   }
 
+  @Test
+  public void testNoDeltaAfterDeleteAndMinorCompaction() throws Exception {
+    String tableName = "test_major_delete_minor";
+
+    runStatementOnDriver("drop table if exists " + tableName);
+    runStatementOnDriver("create table " + tableName + " (name VARCHAR(50), age TINYINT, num_clicks BIGINT) stored as orc" +
+            " tblproperties ('transactional'='true')");
+    runStatementOnDriver("insert into " + tableName + " values ('amy', 35, 12341234)");
+    runStatementOnDriver("insert into " + tableName + " values ('bob', 66, 1234712348712)");
+    runStatementOnDriver("insert into " + tableName + " values ('cal', 21, 431)");
+    runStatementOnDriver("insert into " + tableName + " values ('fse', 28, 8456)");
+    runStatementOnDriver("alter table " + tableName + " compact 'major'");
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+    runStatementOnDriver("delete from " + tableName + " where name='bob'");
+    runStatementOnDriver("delete from " + tableName + " WHERE name='fse'");
+    runStatementOnDriver("alter table " + tableName + " compact 'minor'");
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] fileStatuses = fs.globStatus(new Path(getWarehouseDir() + "/" + tableName + "/*"));
+    for(FileStatus fileStatus : fileStatuses) {
+      Assert.assertFalse(fileStatus.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
+    }
+  }
+
   @Test
   public void testNoTxnComponentsForScheduledQueries() throws Exception {
     String tableName = "scheduledquerytable";