You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/22 07:20:11 UTC

[GitHub] [hudi] codope commented on a diff in pull request #6854: [HUDI-4631] Adding retries to spark datasource writes on conflict failures:

codope commented on code in PR #6854:
URL: https://github.com/apache/hudi/pull/6854#discussion_r1002384510


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -189,6 +189,19 @@ public class HoodieLockConfig extends HoodieConfig {
       .withDocumentation("Lock provider class name, this should be subclass of "
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
 
+  public static final ConfigProperty<Boolean> RETRY_ON_CONFLICT_FAILURES = ConfigProperty
+      .key(LOCK_PREFIX + "retry.on.conflict.failures")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("Whenever there is a conflict, one of the commits will get aborted. By enabling this conflict, hudi will retry the batch again.");
+
+  public static final ConfigProperty<Integer> NUM_RETRIES_ON_CONFLICT_FAILURES = ConfigProperty

Review Comment:
   There is already a retry config for acquiring lock. Why can't we reuse that? Is the intention of this config to retry for any kind of failure and not just lock acquisition? If so, would it be better to add it in write configs instead of lock configs?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -274,6 +275,69 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(snapshotDF2.count(), 80)
   }
 
+  /**
+   * Test retries on conflict failures.
+   * @param enableRetries
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testCopyOnWriteConcurrentUpdates(enableRetries: Boolean): Unit = {
+    initTestDataGenerator()
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 1000)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.write.concurrency.mode","optimistic_concurrency_control")

Review Comment:
   let's use the config constant and enum.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org