You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/04 05:53:09 UTC

[hudi] 04/07: [HUDI-5682] Bucket index does not work correctly for multi-writer scenarios (#7838)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e0ef594ed2e70dbc60f03b0d83794d1ecf7ecd69
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Feb 3 19:03:16 2023 +0800

    [HUDI-5682] Bucket index does not work correctly for multi-writer scenarios (#7838)
    
    Resolve conflicts with the bucket Ids instead of the whole file Ids because the last 24 characters are randomly generated.
    
    We do not make the last 24 characters in the file group Id as a constant, mainly because of the replace commit now shadows all the replaced files based on the file group ID, if the same file Id is used for the new file and the replaced file, then all the data in the new files can not be queried.
---
 ...urrentFileWritesConflictResolutionStrategy.java |  55 ++++++++
 .../org/apache/hudi/config/HoodieLockConfig.java   |   9 ++
 .../apache/hudi/index/bucket/BucketIdentifier.java |   6 +-
 ...urrentFileWritesConflictResolutionStrategy.java | 150 +++++++++++++++++++++
 4 files changed, 219 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
new file mode 100644
index 00000000000..503f1c42185
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.index.bucket.BucketIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for concurrent writes {@link ConflictResolutionStrategy} using bucket index.
+ */
+public class BucketIndexConcurrentFileWritesConflictResolutionStrategy
+    extends SimpleConcurrentFileWritesConflictResolutionStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(BucketIndexConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
+    // TODO : UUID's can clash even for insert/insert, handle that case.
+    Set<String> bucketIdsSetForFirstInstant = extractBucketIds(thisOperation.getMutatedFileIds());
+    Set<String> bucketIdsSetForSecondInstant = extractBucketIds(otherOperation.getMutatedFileIds());
+    Set<String> intersection = new HashSet<>(bucketIdsSetForFirstInstant);
+    intersection.retainAll(bucketIdsSetForSecondInstant);
+    if (!intersection.isEmpty()) {
+      LOG.info("Found conflicting writes between first operation = " + thisOperation
+          + ", second operation = " + otherOperation + " , intersecting bucket ids " + intersection);
+      return true;
+    }
+    return false;
+  }
+
+  private static Set<String> extractBucketIds(Set<String> fileIds) {
+    return fileIds.stream().map(BucketIdentifier::bucketIdStrFromFileId).collect(Collectors.toSet());
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index f4aeef09b5c..3c932756685 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.config;
 
+import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.lock.LockProvider;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.index.HoodieIndex;
 
 import java.io.File;
 import java.io.FileReader;
@@ -185,6 +187,13 @@ public class HoodieLockConfig extends HoodieConfig {
   public static final ConfigProperty<String> WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME = ConfigProperty
       .key(LOCK_PREFIX + "conflict.resolution.strategy")
       .defaultValue(SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName())
+      .withInferFunction(hoodieConfig -> {
+        if (HoodieIndex.IndexType.BUCKET.name().equalsIgnoreCase(hoodieConfig.getStringOrDefault(HoodieIndexConfig.INDEX_TYPE, null))) {
+          return Option.of(BucketIndexConcurrentFileWritesConflictResolutionStrategy.class.getName());
+        } else {
+          return Option.of(SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName());
+        }
+      })
       .sinceVersion("0.8.0")
       .withDocumentation("Lock provider class name, this should be subclass of "
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 35f9205a8e5..5264c8b39f1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -76,7 +76,11 @@ public class BucketIdentifier implements Serializable {
   }
 
   public static int bucketIdFromFileId(String fileId) {
-    return Integer.parseInt(fileId.substring(0, 8));
+    return Integer.parseInt(bucketIdStrFromFileId(fileId));
+  }
+
+  public static String bucketIdStrFromFileId(String fileId) {
+    return fileId.substring(0, 8);
   }
 
   public static String bucketIdStr(int n) {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
new file mode 100644
index 00000000000..24c578606d4
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TestBucketIndexConcurrentFileWritesConflictResolutionStrategy extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void init() throws IOException {
+    initMetaClient();
+  }
+
+  @Test
+  public void testNoConcurrentWrites() throws Exception {
+    String newInstantTime = HoodieTestTable.makeNewCommitTime();
+    createCommit(newInstantTime);
+    // consider commits before this are all successful
+
+    Option<HoodieInstant> lastSuccessfulInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    newInstantTime = HoodieTestTable.makeNewCommitTime();
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
+
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    Stream<HoodieInstant> candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant);
+    Assertions.assertEquals(0, candidateInstants.count());
+  }
+
+  @Test
+  public void testConcurrentWrites() throws Exception {
+    String newInstantTime = HoodieTestTable.makeNewCommitTime();
+    createCommit(newInstantTime);
+    // consider commits before this are all successful
+    // writer 1
+    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    // writer 2
+    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    Option<HoodieInstant> lastSuccessfulInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    newInstantTime = HoodieTestTable.makeNewCommitTime();
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    Stream<HoodieInstant> candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant);
+    Assertions.assertEquals(0, candidateInstants.count());
+  }
+
+  @Test
+  public void testConcurrentWritesWithInterleavingSuccessfulCommit() throws Exception {
+    createCommit(HoodieActiveTimeline.createNewInstantTime());
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    // consider commits before this are all successful
+    Option<HoodieInstant> lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    // writer 1 starts
+    String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
+    createInflightCommit(currentWriterInstant);
+    // writer 2 starts and finishes
+    String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    createCommit(newInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
+    timeline = timeline.reload();
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
+        Collectors.toList());
+    // writer 1 conflicts with writer 2
+    Assertions.assertEquals(1, candidateInstants.size());
+    ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
+    Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
+    try {
+      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
+      Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict");
+    } catch (HoodieWriteConflictException e) {
+      // expected
+    }
+  }
+
+  private void createCommit(String instantTime) throws Exception {
+    String fileId1 = "00000001-file-" + instantTime + "-1";
+    String fileId2 = "00000002-file-"  + instantTime + "-2";
+
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setFileId(fileId1);
+    commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    HoodieTestTable.of(metaClient)
+        .addCommit(instantTime, Option.of(commitMetadata))
+        .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+  }
+
+  private HoodieCommitMetadata createCommitMetadata(String instantTime, String writeFileName) {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setFileId(writeFileName);
+    commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    return commitMetadata;
+  }
+
+  private HoodieCommitMetadata createCommitMetadata(String instantTime) {
+    return createCommitMetadata(instantTime, "00000001-file-" + instantTime + "-1");
+  }
+
+  private void createInflightCommit(String instantTime) throws Exception {
+    String fileId1 = "00000001-file-" + instantTime + "-1";
+    String fileId2 = "00000002-file-" + instantTime + "-2";
+    HoodieTestTable.of(metaClient)
+        .addInflightCommit(instantTime)
+        .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+  }
+}