You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/08/10 00:51:50 UTC

[hudi] branch master updated: [HUDI-1098] Adding OptimisticConsistencyGuard to be used during FinalizeWrite (#1912)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 858eda8  [HUDI-1098] Adding OptimisticConsistencyGuard to be used during FinalizeWrite (#1912)
858eda8 is described below

commit 858eda85d7a5e4ee3be9aa28e7e7311bc8fb8983
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sun Aug 9 20:51:37 2020 -0400

    [HUDI-1098] Adding OptimisticConsistencyGuard to be used during FinalizeWrite (#1912)
---
 .../java/org/apache/hudi/table/HoodieTable.java    |  18 +++-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 115 +++++++++++++++------
 .../apache/hudi/table/TestConsistencyGuard.java    |  64 ++++++++++--
 .../hudi/common/fs/ConsistencyGuardConfig.java     |  40 ++++++-
 .../hudi/common/fs/FailSafeConsistencyGuard.java   | 104 +++++++++++--------
 .../hudi/common/fs/OptimisticConsistencyGuard.java |  90 ++++++++++++++++
 6 files changed, 340 insertions(+), 91 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 565e046..4ed32b4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -32,7 +32,9 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -505,7 +507,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
     final FileSystem fileSystem = metaClient.getRawFs();
     List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
     try {
-      getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility);
+      getConsistencyGuard(fileSystem, config.getConsistencyGuardConfig()).waitTill(partitionPath, fileList, visibility);
     } catch (IOException | TimeoutException ioe) {
       LOG.error("Got exception while waiting for files to show up", ioe);
       return false;
@@ -513,8 +515,18 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
     return true;
   }
 
-  private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
-    return new FailSafeConsistencyGuard(fileSystem, config.getConsistencyGuardConfig());
+  /**
+   * Instantiate {@link ConsistencyGuard} based on configs.
+   * <p>
+   * Default consistencyGuard class is {@link OptimisticConsistencyGuard}.
+   */
+  public static ConsistencyGuard getConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) throws IOException {
+    try {
+      return consistencyGuardConfig.shouldEnableOptimisticConsistencyGuard()
+          ? new OptimisticConsistencyGuard(fs, consistencyGuardConfig) : new FailSafeConsistencyGuard(fs, consistencyGuardConfig);
+    } catch (Throwable e) {
+      throw new IOException("Could not load ConsistencyGuard ", e);
+    }
   }
 
   public SparkTaskContextSupplier getSparkTaskContextSupplier() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 7eea3ae..51d8a6a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -44,6 +44,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.io.IOType;
@@ -63,6 +64,7 @@ import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -91,6 +93,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -1081,54 +1084,97 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   /**
    * Tests behavior of committing only when consistency is verified.
    */
-  @Test
-  public void testConsistencyCheckDuringFinalize() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
     String instantTime = "000";
-    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
+    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+        .withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
     HoodieWriteClient client = getHoodieWriteClient(cfg);
-    Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, instantTime);
+    Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
 
     // Delete orphan marker and commit should succeed
     metaClient.getFs().delete(result.getKey(), false);
-    assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed");
-    assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
-        "After explicit commit, commit file should be created");
-    // Marker directory must be removed
-    assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
+    if (!enableOptimisticConsistencyGuard) {
+      assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed");
+      assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
+          "After explicit commit, commit file should be created");
+      // Marker directory must be removed
+      assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
+    } else {
+      // with optimistic, first client.commit should have succeeded.
+      assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
+          "After explicit commit, commit file should be created");
+      // Marker directory must be removed
+      assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
+    }
   }
 
-  private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers) throws Exception {
+  private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws Exception {
     String instantTime = "000";
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
-    HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false).build();
+    HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
+            .withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() :
+        getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
+            .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+                .withConsistencyCheckEnabled(true)
+                .withOptimisticConsistencyGuardSleepTimeMs(1).build()).build();
     HoodieWriteClient client = getHoodieWriteClient(cfg);
-    testConsistencyCheck(metaClient, instantTime);
-
-    // Rollback of this commit should succeed
-    client.rollback(instantTime);
-    assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
-        "After explicit rollback, commit file should not be present");
-    // Marker directory must be removed after rollback
-    assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
+    testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
+
+    if (!enableOptimisticConsistencyGuard) {
+      // Rollback of this commit should succeed with FailSafeCG
+      client.rollback(instantTime);
+      assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
+          "After explicit rollback, commit file should not be present");
+      // Marker directory must be removed after rollback
+      assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
+    } else {
+      // if optimistic CG is enabled, commit should have succeeded.
+      assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
+          "With optimistic CG, first commit should succeed. commit file should be present");
+      // Marker directory must be removed after rollback
+      assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
+      if (rollbackUsingMarkers) {
+        // rollback of a completed commit should fail if marked based rollback is used.
+        try {
+          client.rollback(instantTime);
+          fail("Rollback of completed commit should throw exception");
+        } catch (HoodieRollbackException e) {
+          // ignore
+        }
+      } else {
+        // rollback of a completed commit should succeed if using list based rollback
+        client.rollback(instantTime);
+        assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
+            "After explicit rollback, commit file should not be present");
+      }
+    }
   }
 
-  @Test
-  public void testRollbackAfterConsistencyCheckFailureUsingFileList() throws Exception {
-    testRollbackAfterConsistencyCheckFailureUsingFileList(false);
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean enableOptimisticConsistencyGuard) throws Exception {
+    testRollbackAfterConsistencyCheckFailureUsingFileList(false, enableOptimisticConsistencyGuard);
   }
 
-  @Test
-  public void testRollbackAfterConsistencyCheckFailureUsingMarkers() throws Exception {
-    testRollbackAfterConsistencyCheckFailureUsingFileList(true);
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableOptimisticConsistencyGuard) throws Exception {
+    testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard);
   }
 
-  private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime)
+  private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard)
       throws Exception {
-    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
+    HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? (getConfigBuilder().withAutoCommit(false)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
-            .withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).build())
-        .build();
+            .withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build())
+        .build()) : (getConfigBuilder().withAutoCommit(false)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
+            .withOptimisticConsistencyGuardSleepTimeMs(1).build())
+        .build());
     HoodieWriteClient client = getHoodieWriteClient(cfg);
 
     client.startCommitWithTime(instantTime);
@@ -1149,10 +1195,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
             IOType.MERGE);
     LOG.info("Created a dummy marker path=" + markerFilePath);
 
-    Exception e = assertThrows(HoodieCommitException.class, () -> {
+    if (!enableOptimisticConsistencyGuard) {
+      Exception e = assertThrows(HoodieCommitException.class, () -> {
+        client.commit(instantTime, result);
+      }, "Commit should fail due to consistency check");
+      assertTrue(e.getCause() instanceof HoodieIOException);
+    } else {
+      // with optimistic CG, commit should succeed
       client.commit(instantTime, result);
-    }, "Commit should fail due to consistency check");
-    assertTrue(e.getCause() instanceof HoodieIOException);
+    }
     return Pair.of(markerFilePath, result);
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index 2406d85..da4224a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 
@@ -28,14 +29,29 @@ import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+/**
+ * Unit tests {@link ConsistencyGuard}s.
+ */
 public class TestConsistencyGuard extends HoodieClientTestHarness {
 
+  // multiple parameters, uses Collection<Object[]>
+  public static List<Arguments> consistencyGuardType() {
+    return Arrays.asList(
+        Arguments.of(FailSafeConsistencyGuard.class.getName()),
+        Arguments.of(OptimisticConsistencyGuard.class.getName())
+    );
+  }
+
   @BeforeEach
   public void setup() {
     initPath();
@@ -47,13 +63,16 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
     cleanupResources();
   }
 
-  @Test
-  public void testCheckPassingAppearAndDisAppear() throws Exception {
+  @ParameterizedTest
+  @MethodSource("consistencyGuardType")
+  public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
 
-    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig(1, 1000, 1000));
+    ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
+    ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
+        ? new FailSafeConsistencyGuard(fs, config) : new OptimisticConsistencyGuard(fs, config);
     passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
     passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
     passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
@@ -68,7 +87,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testCheckFailingAppear() throws Exception {
+  public void testCheckFailingAppearFailSafe() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
@@ -78,7 +97,15 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testCheckFailingAppears() throws Exception {
+  public void testCheckFailingAppearTimedWait() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
+    passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
+          .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
+  }
+
+  @Test
+  public void testCheckFailingAppearsFailSafe() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
@@ -87,7 +114,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testCheckFailingDisappear() throws Exception {
+  public void testCheckFailingAppearsTimedWait() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
+    passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
+  }
+
+  @Test
+  public void testCheckFailingDisappearFailSafe() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
     assertThrows(TimeoutException.class, () -> {
@@ -97,7 +131,15 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testCheckFailingDisappears() throws Exception {
+  public void testCheckFailingDisappearTimedWait() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
+    passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
+          .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
+  }
+
+  @Test
+  public void testCheckFailingDisappearsFailSafe() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
     ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
@@ -106,6 +148,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
     });
   }
 
+  @Test
+  public void testCheckFailingDisappearsTimedWait() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
+    passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
+  }
+
   private ConsistencyGuardConfig getConsistencyGuardConfig() {
     return getConsistencyGuardConfig(3, 10, 10);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java
index 55d80df..e55fb24 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java
@@ -36,15 +36,23 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig {
   // time between successive attempts to ensure written data's metadata is consistent on storage
   private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
       "hoodie.consistency.check.initial_interval_ms";
-  private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
+  private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 400L;
 
   // max interval time
   private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
-  private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
+  private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 20000L;
 
-  // maximum number of checks, for consistency of written data. Will wait upto 256 Secs
+  // maximum number of checks, for consistency of written data. Will wait upto 140 Secs
   private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
-  private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+  private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 6;
+
+  // sleep time for OptimisticConsistencyGuard
+  private static final String OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP = "hoodie.optimistic.consistency.guard.sleep_time_ms";
+  private static long DEFAULT_OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP = 500L;
+
+  // config to enable OptimisticConsistencyGuard in finalizeWrite instead of FailSafeConsistencyGuard
+  private static final String ENABLE_OPTIMISTIC_CONSISTENCY_GUARD = "_hoodie.optimistic.consistency.guard.enable";
+  private static boolean DEFAULT_ENABLE_OPTIMISTIC_CONSISTENCY_GUARD = true;
 
   public ConsistencyGuardConfig(Properties props) {
     super(props);
@@ -70,6 +78,14 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
+  public long getOptimisticConsistencyGuardSleepTimeMs() {
+    return Long.parseLong(props.getProperty(OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP));
+  }
+
+  public boolean shouldEnableOptimisticConsistencyGuard() {
+    return Boolean.parseBoolean(props.getProperty(ENABLE_OPTIMISTIC_CONSISTENCY_GUARD));
+  }
+
   /**
    * The builder used to build consistency configurations.
    */
@@ -109,6 +125,16 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withOptimisticConsistencyGuardSleepTimeMs(long sleepTimeMs) {
+      props.setProperty(OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP, String.valueOf(sleepTimeMs));
+      return this;
+    }
+
+    public Builder withEnableOptimisticConsistencyGuard(boolean enableOptimisticConsistencyGuard) {
+      props.setProperty(ENABLE_OPTIMISTIC_CONSISTENCY_GUARD, String.valueOf(enableOptimisticConsistencyGuard));
+      return this;
+    }
+
     public ConsistencyGuardConfig build() {
       setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP), CONSISTENCY_CHECK_ENABLED_PROP,
           DEFAULT_CONSISTENCY_CHECK_ENABLED);
@@ -118,7 +144,11 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig {
           MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
       setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP), MAX_CONSISTENCY_CHECKS_PROP,
           String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
-
+      setDefaultOnCondition(props, !props.containsKey(OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP),
+          OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP,  String.valueOf(DEFAULT_OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP));
+      setDefaultOnCondition(props, !props.containsKey(ENABLE_OPTIMISTIC_CONSISTENCY_GUARD),
+          ENABLE_OPTIMISTIC_CONSISTENCY_GUARD,
+          String.valueOf(DEFAULT_ENABLE_OPTIMISTIC_CONSISTENCY_GUARD));
       return new ConsistencyGuardConfig(props);
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java
index ac3a479..2247b92 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java
@@ -32,7 +32,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -42,8 +41,8 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
   private static final Logger LOG = LogManager.getLogger(FailSafeConsistencyGuard.class);
 
-  private final FileSystem fs;
-  private final ConsistencyGuardConfig consistencyGuardConfig;
+  protected final FileSystem fs;
+  protected final ConsistencyGuardConfig consistencyGuardConfig;
 
   public FailSafeConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) {
     this.fs = fs;
@@ -73,7 +72,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
   /**
    * Helper function to wait for all files belonging to single directory to appear.
-   * 
+   *
    * @param dirPath Dir Path
    * @param files Files to appear/disappear
    * @param event Appear/Disappear
@@ -81,45 +80,19 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
    */
   public void waitForFilesVisibility(String dirPath, List<String> files, FileVisibility event) throws TimeoutException {
     Path dir = new Path(dirPath);
-    List<String> filesWithoutSchemeAndAuthority =
-        files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(Path::toString)
-            .collect(Collectors.toList());
-
-    retryTillSuccess((retryNum) -> {
-      try {
-        LOG.info("Trying " + retryNum);
-        FileStatus[] entries = fs.listStatus(dir);
-        List<String> gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath()))
-            .map(Path::toString).collect(Collectors.toList());
-        List<String> candidateFiles = new ArrayList<>(filesWithoutSchemeAndAuthority);
-        boolean altered = candidateFiles.removeAll(gotFiles);
-
-        switch (event) {
-          case DISAPPEAR:
-            LOG.info("Following files are visible" + candidateFiles);
-            // If no candidate files gets removed, it means all of them have disappeared
-            return !altered;
-          case APPEAR:
-          default:
-            // if all files appear, the list is empty
-            return candidateFiles.isEmpty();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
-      }
-      return false;
-    }, "Timed out waiting for files to become visible");
+    List<String> filesWithoutSchemeAndAuthority = getFilesWithoutSchemeAndAuthority(files);
+    retryTillSuccess(dir, filesWithoutSchemeAndAuthority, event);
   }
 
   /**
    * Helper to check of file visibility.
-   * 
+   *
    * @param filePath File Path
    * @param visibility Visibility
    * @return true (if file visible in Path), false (otherwise)
    * @throws IOException -
    */
-  private boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException {
+  protected boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException {
     try {
       FileStatus status = fs.getFileStatus(filePath);
       switch (visibility) {
@@ -142,10 +115,8 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
   /**
    * Helper function to wait till file either appears/disappears.
-   * 
+   *
    * @param filePath File Path
-   * @param visibility
-   * @throws TimeoutException
    */
   private void waitForFileVisibility(Path filePath, FileVisibility visibility) throws TimeoutException {
     long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
@@ -169,17 +140,18 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
   /**
    * Retries the predicate for condfigurable number of times till we the predicate returns success.
-   * 
-   * @param predicate Predicate Function
-   * @param timedOutMessage Timed-Out message for logging
+   *
+   * @param dir directory of interest in which list of files are checked for visibility
+   * @param files List of files to check for visibility
+   * @param event {@link org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility} event of interest.
    * @throws TimeoutException when retries are exhausted
    */
-  private void retryTillSuccess(Function<Integer, Boolean> predicate, String timedOutMessage) throws TimeoutException {
+  private void retryTillSuccess(Path dir, List<String> files, FileVisibility event) throws TimeoutException {
     long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
     int attempt = 0;
     LOG.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks());
     while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
-      boolean success = predicate.apply(attempt);
+      boolean success = checkFilesVisibility(attempt, dir, files, event);
       if (success) {
         return;
       }
@@ -188,11 +160,55 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
       waitMs = Math.min(waitMs, consistencyGuardConfig.getMaxConsistencyCheckIntervalMs());
       attempt++;
     }
-    throw new TimeoutException(timedOutMessage);
+    throw new TimeoutException("Timed out waiting for files to adhere to event " + event.name());
+  }
 
+  /**
+   * Helper to check for file visibility based on {@link org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility} event.
+   *
+   * @param retryNum retry attempt count.
+   * @param dir directory of interest in which list of files are checked for visibility
+   * @param files List of files to check for visibility
+   * @param event {@link org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility} event of interest.
+   * @return {@code true} if condition succeeded. else {@code false}.
+   */
+  protected boolean checkFilesVisibility(int retryNum, Path dir, List<String> files, FileVisibility event) {
+    try {
+      LOG.info("Trying " + retryNum);
+      FileStatus[] entries = fs.listStatus(dir);
+      List<String> gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath()))
+          .map(Path::toString).collect(Collectors.toList());
+      List<String> candidateFiles = new ArrayList<>(files);
+      boolean altered = candidateFiles.removeAll(gotFiles);
+
+      switch (event) {
+        case DISAPPEAR:
+          LOG.info("Following files are visible" + candidateFiles);
+          // If no candidate files gets removed, it means all of them have disappeared
+          return !altered;
+        case APPEAR:
+        default:
+          // if all files appear, the list is empty
+          return candidateFiles.isEmpty();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
+    }
+    return false;
+  }
+
+  /**
+   * Generate file names without scheme and authority.
+   *
+   * @param files list of files of interest.
+   * @return the filenames without scheme and authority.
+   */
+  protected List<String> getFilesWithoutSchemeAndAuthority(List<String> files) {
+    return files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(Path::toString)
+        .collect(Collectors.toList());
   }
 
-  void sleepSafe(long waitMs) {
+  private void sleepSafe(long waitMs) {
     try {
       Thread.sleep(waitMs);
     } catch (InterruptedException e) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/OptimisticConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/OptimisticConsistencyGuard.java
new file mode 100644
index 0000000..ded7ccc
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/OptimisticConsistencyGuard.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A consistency guard which sleeps for configured period of time only on APPEAR. It is a no-op for DISAPPEAR.
+ * This is specifically for S3A filesystem and here is the rational.
+ * This guard is used when deleting data files corresponding to marker files that needs to be deleted.
+ * There are two tricky cases that needs to be considered. Case 1 : A data file creation is eventually consistent and hence
+ * when issuing deletes, it may not be found. Case 2: a data file was never created in the first place since the process crashed.
+ * In S3A, GET and LIST are eventually consistent, and delete() implementation internally does a LIST/EXISTS.
+ * Prior to this patch, hudi was leveraging {@link FailSafeConsistencyGuard} which was doing the following to delete data files.
+ * Step1: wait for all files to appear with linear backoff.
+ * Step2: issue deletes
+ * Step3: wait for all files to disappear with linear backoff.
+ * Step1 and Step2 is handled by {@link FailSafeConsistencyGuard}.
+ *
+ * We are simplifying these steps with {@link OptimisticConsistencyGuard}.
+ * Step1: Check if all files adhere to visibility event. If yes, proceed to Sptep 3.
+ * Step2: If not, Sleep for a configured threshold and then proceed to next step.
+ * Step3: issue deletes.
+ *
+ * With this, if any files that was created, should be available within configured threshold(eventual consistency).
+ * Delete() will return false if FileNotFound. So, both cases are taken care of this {@link ConsistencyGuard}.
+ */
+public class OptimisticConsistencyGuard extends FailSafeConsistencyGuard {
+
+  private static final Logger LOG = LogManager.getLogger(OptimisticConsistencyGuard.class);
+
+  public OptimisticConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) {
+    super(fs, consistencyGuardConfig);
+  }
+
+  @Override
+  public void waitTillFileAppears(Path filePath) throws TimeoutException {
+    try {
+      if (!checkFileVisibility(filePath, FileVisibility.APPEAR)) {
+        Thread.sleep(consistencyGuardConfig.getOptimisticConsistencyGuardSleepTimeMs());
+      }
+    } catch (IOException | InterruptedException ioe) {
+      LOG.warn("Got IOException or InterruptedException waiting for file visibility. Ignoring", ioe);
+    }
+  }
+
+  @Override
+  public void waitTillFileDisappears(Path filePath) throws TimeoutException {
+    // no op
+  }
+
+  @Override
+  public void waitTillAllFilesAppear(String dirPath, List<String> files) throws TimeoutException {
+    try {
+      if (!checkFilesVisibility(1, new Path(dirPath), getFilesWithoutSchemeAndAuthority(files), FileVisibility.APPEAR)) {
+        Thread.sleep(consistencyGuardConfig.getOptimisticConsistencyGuardSleepTimeMs());
+      }
+    } catch (InterruptedException ie) {
+      LOG.warn("Got InterruptedException waiting for file visibility. Ignoring", ie);
+    }
+  }
+
+  @Override
+  public void waitTillAllFilesDisappear(String dirPath, List<String> files) throws  TimeoutException {
+    // no op
+  }
+}