You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/24 07:34:18 UTC

[hudi] branch master updated: [HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (#6130)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cc614c604 [HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (#6130)
7cc614c604 is described below

commit 7cc614c6049812202b7fc427e9f399d3a792f5bb
Author: liujinhui <96...@qq.com>
AuthorDate: Sat Sep 24 15:34:09 2022 +0800

    [HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (#6130)
    
    There are two minor issues fixed here:
    
    1. When the insert_overwrite operation is performed, the
        clusteringPlan in the requestedReplaceMetadata will be
        null. Calling getFileIdsFromRequestedReplaceMetadata will cause NPE.
    
    2. When insert_overwrite operation, inflightCommitMetadata!=null,
        getOperationType should be obtained from getHoodieInflightReplaceMetadata,
        the original code will have a null pointer.
---
 .../client/transaction/ConcurrentOperation.java    |  5 ++-
 ...urrentFileWritesConflictResolutionStrategy.java | 49 ++++++++++++++++++++++
 .../hudi/common/testutils/HoodieTestTable.java     |  6 +++
 3 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 40da7dca7f..35580229e3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -124,14 +124,15 @@ public class ConcurrentOperation {
             HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
             org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
             if (instant.isRequested()) {
-              if (requestedReplaceMetadata != null) {
+              // for insert_overwrite/insert_overwrite_table clusteringPlan will be empty
+              if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) {
                 this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
                 this.operationType = WriteOperationType.CLUSTER;
               }
             } else {
               if (inflightCommitMetadata != null) {
                 this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
-                this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
+                this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType());
               } else if (requestedReplaceMetadata != null) {
                 // inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit
                 this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
index e7cc296ff6..39b9e1e6dc 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -123,6 +123,41 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     }
   }
 
+  @Test
+  public void testConcurrentWritesWithReplaceInflightCommit() throws Exception {
+    createReplaceInflight(HoodieActiveTimeline.createNewInstantTime());
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
+
+    // writer 1 starts
+    String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
+    createInflightCommit(currentWriterInstant);
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+
+    // writer 2 starts and finishes
+    String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    createReplaceInflight(newInstantTime);
+
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
+    timeline = timeline.reload();
+
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
+        Collectors.toList());
+
+    // writer 1 conflicts with writer 2
+    Assertions.assertTrue(candidateInstants.size() == 1);
+    ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
+    Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
+    try {
+      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
+      Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict");
+    } catch (HoodieWriteConflictException e) {
+      // expected
+    }
+  }
+
   @Test
   public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception {
     createCommit(HoodieActiveTimeline.createNewInstantTime());
@@ -394,6 +429,20 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
         .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
   }
 
+  private void createReplaceInflight(String instantTime) throws Exception {
+    String fileId1 = "file-1";
+    String fileId2 = "file-2";
+
+    HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
+    inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setFileId("file-1");
+    inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
+    HoodieTestTable.of(metaClient)
+        .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata))
+        .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+  }
+
   private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
     String fileId1 = "file-1";
     String fileId2 = "file-2";
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 31de82b12c..c2531d47c1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -284,6 +284,12 @@ public class HoodieTestTable {
     return this;
   }
 
+  public HoodieTestTable addInflightReplace(String instantTime, Option<HoodieCommitMetadata> inflightReplaceMetadata) throws Exception {
+    createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata);
+    currentInstantTime = instantTime;
+    return this;
+  }
+
   public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
     createRequestedCleanFile(basePath, instantTime, cleanerPlan);
     createInflightCleanFile(basePath, instantTime, cleanerPlan);