You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/24 07:34:18 UTC
[hudi] branch master updated: [HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (#6130)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7cc614c604 [HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (#6130)
7cc614c604 is described below
commit 7cc614c6049812202b7fc427e9f399d3a792f5bb
Author: liujinhui <96...@qq.com>
AuthorDate: Sat Sep 24 15:34:09 2022 +0800
[HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (#6130)
There are two minor issues fixed here:
1. When the insert_overwrite operation is performed, the
clusteringPlan in the requestedReplaceMetadata will be
null. Calling getFileIdsFromRequestedReplaceMetadata will cause NPE.
2. When insert_overwrite operation, inflightCommitMetadata!=null,
getOperationType should be obtained from getHoodieInflightReplaceMetadata,
the original code will have a null pointer.
---
.../client/transaction/ConcurrentOperation.java | 5 ++-
...urrentFileWritesConflictResolutionStrategy.java | 49 ++++++++++++++++++++++
.../hudi/common/testutils/HoodieTestTable.java | 6 +++
3 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 40da7dca7f..35580229e3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -124,14 +124,15 @@ public class ConcurrentOperation {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
if (instant.isRequested()) {
- if (requestedReplaceMetadata != null) {
+ // for insert_overwrite/insert_overwrite_table clusteringPlan will be empty
+ if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) {
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
this.operationType = WriteOperationType.CLUSTER;
}
} else {
if (inflightCommitMetadata != null) {
this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
- this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
+ this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType());
} else if (requestedReplaceMetadata != null) {
// inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
index e7cc296ff6..39b9e1e6dc 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -123,6 +123,41 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
}
}
+ @Test
+ public void testConcurrentWritesWithReplaceInflightCommit() throws Exception {
+ createReplaceInflight(HoodieActiveTimeline.createNewInstantTime());
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
+
+ // writer 1 starts
+ String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
+ createInflightCommit(currentWriterInstant);
+ Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+
+ // writer 2 starts and finishes
+ String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ createReplaceInflight(newInstantTime);
+
+ SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy();
+ HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
+ timeline = timeline.reload();
+
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
+ Collectors.toList());
+
+ // writer 1 conflicts with writer 2
+ Assertions.assertTrue(candidateInstants.size() == 1);
+ ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
+ ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
+ Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
+ try {
+ strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
+ Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict");
+ } catch (HoodieWriteConflictException e) {
+ // expected
+ }
+ }
+
@Test
public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception {
createCommit(HoodieActiveTimeline.createNewInstantTime());
@@ -394,6 +429,20 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}
+ private void createReplaceInflight(String instantTime) throws Exception {
+ String fileId1 = "file-1";
+ String fileId2 = "file-2";
+
+ HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
+ inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setFileId("file-1");
+ inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
+ HoodieTestTable.of(metaClient)
+ .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata))
+ .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+ }
+
private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 31de82b12c..c2531d47c1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -284,6 +284,12 @@ public class HoodieTestTable {
return this;
}
+ public HoodieTestTable addInflightReplace(String instantTime, Option<HoodieCommitMetadata> inflightReplaceMetadata) throws Exception {
+ createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata);
+ currentInstantTime = instantTime;
+ return this;
+ }
+
public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
createRequestedCleanFile(basePath, instantTime, cleanerPlan);
createInflightCleanFile(basePath, instantTime, cleanerPlan);