You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/07/23 04:22:18 UTC

[hudi] branch master updated: [HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a8bd76c  [HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857)
a8bd76c is described below

commit a8bd76c299b13af93c9eed92309ecb3a86a6734c
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Wed Jul 22 21:22:06 2020 -0700

    [HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857)
    
    - Prevents failed compactions from causing issues with future commits
---
 .../org/apache/hudi/client/HoodieWriteClient.java  |   9 +
 .../compact/ScheduleCompactionActionExecutor.java  |   2 +-
 .../table/action/compact/CompactionTestBase.java   | 243 +++++++++++++++++++++
 .../table/action/compact/TestAsyncCompaction.java  | 213 +-----------------
 .../table/action/compact/TestInlineCompaction.java | 116 ++++++++++
 5 files changed, 372 insertions(+), 211 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 30dfecb..2486d91 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -341,6 +341,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
 
       // Do an inline compaction if enabled
       if (config.isInlineCompaction()) {
+        runAnyPendingCompactions(table);
         metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
         inlineCompact(extraMetadata);
       } else {
@@ -355,6 +356,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     }
   }
 
+  private void runAnyPendingCompactions(HoodieTable<?> table) {
+    table.getActiveTimeline().getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstants()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight compaction at instant " + instant);
+          compact(instant.getTimestamp(), true);
+        });
+  }
+
   /**
    * Handle auto clean during commit.
    * @param instantTime
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 174c64e..6d88e5c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -65,7 +65,7 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<
     int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
         .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
     if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
-      LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+      LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction
           + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
           + config.getInlineCompactDeltaCommitMax());
       return new HoodieCompactionPlan();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
new file mode 100644
index 0000000..c171255
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CompactionTestBase extends HoodieClientTestBase {
+
+  protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
+        .forTable("test-trip-table")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+
+  /**
+   * HELPER METHODS FOR TESTING.
+   **/
+  protected void validateDeltaCommit(String latestDeltaCommit, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
+                                     HoodieWriteConfig cfg) {
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    HoodieTable table = getHoodieTable(metaClient, cfg);
+    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
+    fileSliceList.forEach(fileSlice -> {
+      Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
+      if (opPair != null) {
+        assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant");
+        assertTrue(fileSlice.getLogFiles().count() > 0,
+            "Expect atleast one log file to be present where the latest delta commit was written");
+        assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present");
+      } else {
+        assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0,
+            "Expect baseInstant to be less than or equal to latestDeltaCommit");
+      }
+    });
+  }
+
+  protected List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants,
+                                                   List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
+      throws Exception {
+
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
+    List<String> gotPendingCompactionInstants =
+        pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
+    assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
+
+    Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =
+        CompactionUtils.getAllPendingCompactionOperations(metaClient);
+
+    if (insertFirst) {
+      // Use first instant for inserting records
+      String firstInstant = deltaInstants.get(0);
+      deltaInstants = deltaInstants.subList(1, deltaInstants.size());
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      client.startCommitWithTime(firstInstant);
+      JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant);
+      List<WriteStatus> statusList = statuses.collect();
+
+      if (!cfg.shouldAutoCommit()) {
+        client.commit(firstInstant, statuses);
+      }
+      assertNoWriteErrors(statusList);
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
+      List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
+      assertTrue(dataFilesToRead.stream().findAny().isPresent(),
+          "should list the parquet files we wrote in the delta commit");
+      validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
+    }
+
+    int numRecords = records.size();
+    for (String instantTime : deltaInstants) {
+      records = dataGen.generateUpdates(instantTime, numRecords);
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
+      validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
+    }
+    return records;
+  }
+
+  protected void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+    metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
+    HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
+        .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
+    assertTrue(instant.isInflight(), "Instant must be marked inflight");
+  }
+
+  protected void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) {
+    client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
+    assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
+  }
+
+  protected void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
+                                            HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
+    scheduleCompaction(compactionInstantTime, client, cfg);
+    executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction);
+  }
+
+  protected void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
+                                 HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
+
+    client.compact(compactionInstantTime);
+    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
+    assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
+    assertFalse(fileSliceList.stream()
+            .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)),
+        "Verify all file-slices have base-instant same as compaction instant");
+    assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()),
+        "Verify all file-slices have data-files");
+
+    if (hasDeltaCommitAfterPendingCompaction) {
+      assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0),
+          "Verify all file-slices have atleast one log-file");
+    } else {
+      assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0),
+          "Verify all file-slices have no log-files");
+    }
+
+    // verify that there is a commit
+    table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
+    HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
+    String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
+    assertEquals(latestCompactionCommitTime, compactionInstantTime,
+        "Expect compaction instant time to be the latest commit time");
+    assertEquals(expectedNumRecs,
+        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
+        "Must contain expected records");
+
+  }
+
+  protected List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, HoodieWriteClient client,
+                                                    HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    client.startCommitWithTime(instantTime);
+
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
+    List<WriteStatus> statusList = statuses.collect();
+    assertNoWriteErrors(statusList);
+    if (!cfg.shouldAutoCommit() && !skipCommit) {
+      client.commit(instantTime, statuses);
+    }
+
+    Option<HoodieInstant> deltaCommit =
+        metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
+    if (skipCommit && !cfg.shouldAutoCommit()) {
+      assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0,
+          "Delta commit should not be latest instant");
+    } else {
+      assertTrue(deltaCommit.isPresent());
+      assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant");
+    }
+    return statusList;
+  }
+
+  protected List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
+    FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
+    HoodieTableFileSystemView view =
+        getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
+    return view.getLatestBaseFiles().collect(Collectors.toList());
+  }
+
+  protected List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
+    HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
+        table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
+    return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
+        .flatMap(view::getLatestFileSlices).collect(Collectors.toList());
+  }
+
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index bf37a88..81840b9 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -18,77 +18,36 @@
 
 package org.apache.hudi.table.action.compact;
 
-import org.apache.hudi.avro.model.HoodieCompactionOperation;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.FileSystemViewStorageType;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.testutils.HoodieClientTestBase;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-import org.apache.hudi.testutils.HoodieTestDataGenerator;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
-import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
-import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test Cases for Async Compaction and Ingestion interaction.
  */
-public class TestAsyncCompaction extends HoodieClientTestBase {
+public class TestAsyncCompaction extends CompactionTestBase {
 
   private HoodieWriteConfig getConfig(Boolean autoCommit) {
     return getConfigBuilder(autoCommit).build();
   }
 
-  private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
-    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
-            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
-        .forTable("test-trip-table")
-        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-  }
-
   @Test
   public void testRollbackForInflightCompaction() throws Exception {
     // Rollback inflight compaction
@@ -372,170 +331,4 @@ public class TestAsyncCompaction extends HoodieClientTestBase {
       executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
     }
   }
-
-  /**
-   * HELPER METHODS FOR TESTING.
-   **/
-
-  private void validateDeltaCommit(String latestDeltaCommit,
-      final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
-      HoodieWriteConfig cfg) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
-    HoodieTable table = getHoodieTable(metaClient, cfg);
-    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
-    fileSliceList.forEach(fileSlice -> {
-      Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
-      if (opPair != null) {
-        assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant");
-        assertTrue(fileSlice.getLogFiles().count() > 0,
-            "Expect atleast one log file to be present where the latest delta commit was written");
-        assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present");
-      } else {
-        assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0,
-            "Expect baseInstant to be less than or equal to latestDeltaCommit");
-      }
-    });
-  }
-
-  private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants,
-                                                 List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
-      throws Exception {
-
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
-    List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
-    List<String> gotPendingCompactionInstants =
-        pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
-    assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
-
-    Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =
-        CompactionUtils.getAllPendingCompactionOperations(metaClient);
-
-    if (insertFirst) {
-      // Use first instant for inserting records
-      String firstInstant = deltaInstants.get(0);
-      deltaInstants = deltaInstants.subList(1, deltaInstants.size());
-      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-      client.startCommitWithTime(firstInstant);
-      JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant);
-      List<WriteStatus> statusList = statuses.collect();
-
-      if (!cfg.shouldAutoCommit()) {
-        client.commit(firstInstant, statuses);
-      }
-      assertNoWriteErrors(statusList);
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
-      HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
-      List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
-      assertTrue(dataFilesToRead.stream().findAny().isPresent(),
-          "should list the parquet files we wrote in the delta commit");
-      validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
-    }
-
-    int numRecords = records.size();
-    for (String instantTime : deltaInstants) {
-      records = dataGen.generateUpdates(instantTime, numRecords);
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
-      createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
-      validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
-    }
-    return records;
-  }
-
-  private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
-    HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
-    metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
-    HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
-        .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
-    assertTrue(instant.isInflight(), "Instant must be marked inflight");
-  }
-
-  private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg)
-      throws IOException {
-    client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
-    HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
-    assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
-  }
-
-  private void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
-      HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
-    scheduleCompaction(compactionInstantTime, client, cfg);
-    executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction);
-  }
-
-  private void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
-      HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
-
-    client.compact(compactionInstantTime);
-    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
-    assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
-    assertFalse(fileSliceList.stream()
-            .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)),
-        "Verify all file-slices have base-instant same as compaction instant");
-    assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()),
-        "Verify all file-slices have data-files");
-
-    if (hasDeltaCommitAfterPendingCompaction) {
-      assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0),
-          "Verify all file-slices have atleast one log-file");
-    } else {
-      assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0),
-          "Verify all file-slices have no log-files");
-    }
-
-    // verify that there is a commit
-    table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
-    HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
-    String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
-    assertEquals(latestCompactionCommitTime, compactionInstantTime,
-        "Expect compaction instant time to be the latest commit time");
-    assertEquals(expectedNumRecs,
-        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
-        "Must contain expected records");
-
-  }
-
-  private List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records,
-      HoodieWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-    client.startCommitWithTime(instantTime);
-
-    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
-    List<WriteStatus> statusList = statuses.collect();
-    assertNoWriteErrors(statusList);
-    if (!cfg.shouldAutoCommit() && !skipCommit) {
-      client.commit(instantTime, statuses);
-    }
-
-    Option<HoodieInstant> deltaCommit =
-        metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
-    if (skipCommit && !cfg.shouldAutoCommit()) {
-      assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0,
-          "Delta commit should not be latest instant");
-    } else {
-      assertTrue(deltaCommit.isPresent());
-      assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant");
-    }
-    return statusList;
-  }
-
-  private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
-    FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
-    HoodieTableFileSystemView view =
-        getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
-    return view.getLatestBaseFiles().collect(Collectors.toList());
-  }
-
-  private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
-    HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
-        table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
-    return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
-        .flatMap(view::getLatestFileSlices).collect(Collectors.toList());
-  }
-
-  protected HoodieTableType getTableType() {
-    return HoodieTableType.MERGE_ON_READ;
-  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
new file mode 100644
index 0000000..4cbb461
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestInlineCompaction extends CompactionTestBase {
+
+  private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) {
+    return getConfigBuilder(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
+        .build();
+  }
+
+  @Test
+  public void testCompactionIsNotScheduledEarly() throws Exception {
+    // Given: make two commits
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+    try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      List<HoodieRecord> records = dataGen.generateInserts("000", 100);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      runNextDeltaCommits(writeClient, readClient, Arrays.asList("000", "001"), records, cfg, true, new ArrayList<>());
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+
+      // Then: ensure no compaction is executedm since there are only 2 delta commits
+      assertEquals(2, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+    }
+  }
+
+  @Test
+  public void testSuccessfulCompaction() throws Exception {
+    // Given: make three commits
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+    List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+
+    try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
+
+      // third commit, that will trigger compaction
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+      createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false);
+
+      // Then: ensure the file slices are compacted as per policy
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+      assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
+    }
+  }
+
+  @Test
+  public void testCompactionRetryOnFailure() throws Exception {
+    // Given: two commits, schedule compaction and its failed/in-flight
+    HoodieWriteConfig cfg = getConfigBuilder(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .build();
+    List<String> instants = CollectionUtils.createImmutableList("000", "001");
+    try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
+      // Schedule compaction 002, make it in-flight (simulates inline compaction failing)
+      scheduleCompaction("002", writeClient, cfg);
+      moveCompactionFromRequestedToInflight("002", cfg);
+    }
+
+    // When: a third commit happens
+    HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2);
+    try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      createNextDeltaCommit("003", dataGen.generateUpdates("003", 100), writeClient, metaClient, inlineCfg, false);
+    }
+
+    // Then: 1 delta commit is done, the failed compaction is retried
+    metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+    assertEquals("002", metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+  }
+}