You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/17 08:54:42 UTC

[GitHub] [hudi] codope commented on a change in pull request #4588: [HUDI-3072] Fixing conflict resolution in transaction management code path for auto commit code path

codope commented on a change in pull request #4588:
URL: https://github.com/apache/hudi/pull/4588#discussion_r785749484



##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -441,6 +445,135 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
     }
   }
 
+  @Test
+  public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withAutoClean(false).build())
+        .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())

Review comment:
       Why disable metadata?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -441,6 +445,135 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
     }
   }
 
+  @Test
+  public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withAutoClean(false).build())
+        .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        // Timeline-server-based markers are not used for multi-writer tests
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+            .build()).withAutoCommit(true).withProperties(properties);
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+    HoodieWriteConfig cfg2 = writeConfigBuilder.build();
+
+    // Create the first commit
+    createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false);
+    // Start another inflight commit
+    String newCommitTime1 = "003";
+    String newCommitTime2 = "004";
+    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
+
+    List<HoodieRecord> updates1 = dataGen.generateUpdates(newCommitTime1, 5000);
+    List<HoodieRecord> updates2 = dataGen.generateUpdates(newCommitTime2, 5000);
+
+    JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(updates1, 4);
+    JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(updates2, 4);
+
+    runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2, SparkRDDWriteClient::upsert, true);
+  }
+
+  private void runConcurrentAndAssert(JavaRDD<HoodieRecord> writeRecords1, JavaRDD<HoodieRecord> writeRecords2,
+                                      SparkRDDWriteClient client1, SparkRDDWriteClient client2,
+                                      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+                                      boolean assertForConflict) throws ExecutionException, InterruptedException {
+
+    CountDownLatch runCountDownLatch = new CountDownLatch(2);
+    final ExecutorService executors = Executors.newFixedThreadPool(2);
+    String newCommitTime1 = "003";
+    String newCommitTime2 = "004";
+
+    AtomicBoolean client1Succeeded = new AtomicBoolean(true);
+    AtomicBoolean client2Succeeded = new AtomicBoolean(true);
+
+    Future future1 = executors.submit(() -> {
+          try {
+            ingestBatch(writeFn, client1, newCommitTime1, writeRecords1, runCountDownLatch);
+          } catch (IOException e) {
+            e.printStackTrace();

Review comment:
       Maybe just log error instead of printStackTrace.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -60,9 +60,31 @@
       final Option<HoodieCommitMetadata> thisCommitMetadata,
       final HoodieWriteConfig config,
       Option<HoodieInstant> lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException {
+    return resolveWriteConflictIfAny(table, currentTxnOwnerInstant, thisCommitMetadata, config, lastCompletedTxnOwnerInstant, false);
+  }
+
+  /**
+   * Resolve any write conflicts when committing data.
+   *
+   * @param table
+   * @param currentTxnOwnerInstant
+   * @param thisCommitMetadata
+   * @param config
+   * @param lastCompletedTxnOwnerInstant
+   * @return
+   * @throws HoodieWriteConflictException
+   */
+  public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
+      final HoodieTable table,
+      final Option<HoodieInstant> currentTxnOwnerInstant,
+      final Option<HoodieCommitMetadata> thisCommitMetadata,
+      final HoodieWriteConfig config,
+      Option<HoodieInstant> lastCompletedTxnOwnerInstant,
+      boolean reloadActiveTimeline) throws HoodieWriteConflictException {

Review comment:
       Why need a boolean? Can we not reload active timeline irrespective of auto commit enabled or not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org