You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/09/02 19:04:20 UTC
[hive] branch master updated: HIVE-26510: Minor compaction creates empty delta files when no prior delta file exists (#3568). (Sourabh Badhya, reviewed by Denys Kuzmenko and Ayush Saxena)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6ada437afc6 HIVE-26510: Minor compaction creates empty delta files when no prior delta file exists (#3568). (Sourabh Badhya, reviewed by Denys Kuzmenko and Ayush Saxena)
6ada437afc6 is described below
commit 6ada437afc641cd44edf4ea0ccdd905a37385005
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Sat Sep 3 00:34:11 2022 +0530
HIVE-26510: Minor compaction creates empty delta files when no prior delta file exists (#3568). (Sourabh Badhya, reviewed by Denys Kuzmenko and Ayush Saxena)
---
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 54 +++++++++++-----------
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 27 +++++++++++
2 files changed, 54 insertions(+), 27 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index d83586d7ffa..5f0650f403f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -801,7 +801,6 @@ public class CompactorMR {
writeIdList, split.getBaseDir(), split.getDeltaDirs(), split.getDeltasToAttemptId());
RecordIdentifier identifier = reader.createKey();
V value = reader.createValue();
- getWriter(reporter, reader.getObjectInspector(), split.getBucket());
AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(jobConf);
@@ -811,13 +810,12 @@ public class CompactorMR {
continue;
}
if (sawDeleteRecord && acidOperationalProperties.isSplitUpdate()) {
- if (deleteEventWriter == null) {
- getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket());
- }
- deleteEventWriter.write(value);
+ getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket())
+ .write(value);
reporter.progress();
} else {
- writer.write(value);
+ getWriter(reporter, reader.getObjectInspector(), split.getBucket())
+ .write(value);
reporter.progress();
}
}
@@ -848,7 +846,7 @@ public class CompactorMR {
//this is id of the current (compactor) txn
return validTxnList.getHighWatermark();
}
- private void getWriter(Reporter reporter, ObjectInspector inspector,
+ private RecordWriter getWriter(Reporter reporter, ObjectInspector inspector,
int bucket) throws IOException {
if (writer == null) {
AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
@@ -873,6 +871,7 @@ public class CompactorMR {
cleanupTmpLocationOnTaskRetry(options, rootDir);
writer = aof.getRawRecordWriter(rootDir, options);
}
+ return writer;
}
@VisibleForTesting
@@ -887,29 +886,30 @@ public class CompactorMR {
}
}
- private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector,
+ private RecordWriter getDeleteEventWriter(Reporter reporter, ObjectInspector inspector,
int bucket) throws IOException {
+ if (deleteEventWriter == null) {
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
+ options.inspector(inspector).writingBase(false)
+ .writingDeleteDelta(true) // this is the option which will make it a delete writer
+ .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+ .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()).reporter(reporter)
+ .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+ .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket)
+ .statementId(-1)//setting statementId == -1 makes compacted delta files use
+ // delta_xxxx_yyyy format
+ .visibilityTxnId(getCompactorTxnId(jobConf));
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
- options.inspector(inspector).writingBase(false)
- .writingDeleteDelta(true) // this is the option which will make it a delete writer
- .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
- .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()).reporter(reporter)
- .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
- .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket)
- .statementId(-1)//setting statementId == -1 makes compacted delta files use
- // delta_xxxx_yyyy format
- .visibilityTxnId(getCompactorTxnId(jobConf));
-
- // Instantiate the underlying output format
- @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
- AcidOutputFormat<WritableComparable, V> aof =
- instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
-
- Path rootDir = new Path(jobConf.get(TMP_LOCATION));
- cleanupTmpLocationOnTaskRetry(options, rootDir);
- deleteEventWriter = aof.getRawRecordWriter(rootDir, options);
+ // Instantiate the underlying output format
+ @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
+ AcidOutputFormat<WritableComparable, V> aof =
+ instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
+ Path rootDir = new Path(jobConf.get(TMP_LOCATION));
+ cleanupTmpLocationOnTaskRetry(options, rootDir);
+ deleteEventWriter = aof.getRawRecordWriter(rootDir, options);
+ }
+ return deleteEventWriter;
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2366dbe0309..0e1edcd9d86 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -3191,6 +3191,33 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
}
}
+ @Test
+ public void testNoDeltaAfterDeleteAndMinorCompaction() throws Exception {
+ String tableName = "test_major_delete_minor";
+
+ runStatementOnDriver("drop table if exists " + tableName);
+ runStatementOnDriver("create table " + tableName + " (name VARCHAR(50), age TINYINT, num_clicks BIGINT) stored as orc" +
+ " tblproperties ('transactional'='true')");
+ runStatementOnDriver("insert into " + tableName + " values ('amy', 35, 12341234)");
+ runStatementOnDriver("insert into " + tableName + " values ('bob', 66, 1234712348712)");
+ runStatementOnDriver("insert into " + tableName + " values ('cal', 21, 431)");
+ runStatementOnDriver("insert into " + tableName + " values ('fse', 28, 8456)");
+ runStatementOnDriver("alter table " + tableName + " compact 'major'");
+ runWorker(hiveConf);
+ runCleaner(hiveConf);
+ runStatementOnDriver("delete from " + tableName + " where name='bob'");
+ runStatementOnDriver("delete from " + tableName + " WHERE name='fse'");
+ runStatementOnDriver("alter table " + tableName + " compact 'minor'");
+ runWorker(hiveConf);
+ runCleaner(hiveConf);
+
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] fileStatuses = fs.globStatus(new Path(getWarehouseDir() + "/" + tableName + "/*"));
+ for(FileStatus fileStatus : fileStatuses) {
+ Assert.assertFalse(fileStatus.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
+ }
+ }
+
@Test
public void testNoTxnComponentsForScheduledQueries() throws Exception {
String tableName = "scheduledquerytable";