You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/22 02:53:35 UTC

[hudi] branch master updated: [HUDI-2925] Fix duplicate cleaning of same files when unfinished clean operations are present using a config. (#4212)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dee8ed  [HUDI-2925] Fix duplicate cleaning of same files when unfinished clean operations are present using a config. (#4212)
0dee8ed is described below

commit 0dee8edc9741ee99e1e2bf98efd9673003fcb1e7
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Mon Feb 21 18:53:03 2022 -0800

    [HUDI-2925] Fix duplicate cleaning of same files when unfinished clean operations are present using a config. (#4212)
    
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 31 ++++++----
 .../apache/hudi/config/HoodieCompactionConfig.java | 12 ++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../hudi/table/action/BaseActionExecutor.java      |  2 +-
 .../table/action/clean/CleanActionExecutor.java    |  8 ++-
 .../java/org/apache/hudi/table/TestCleaner.java    | 68 ++++++++++++++++++++++
 .../org/apache/hudi/common/util/CleanerUtils.java  |  8 +++
 7 files changed, 118 insertions(+), 15 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index f3dc53b..7b67ff5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -767,21 +767,28 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     if (!tableServicesEnabled(config)) {
       return null;
     }
-    if (scheduleInline) {
-      scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
-    }
-    LOG.info("Cleaner started");
     final Timer.Context timerContext = metrics.getCleanCtx();
-    LOG.info("Cleaned failed attempts if any");
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
         HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
-    HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking);
-    if (timerContext != null && metadata != null) {
-      long durationMs = metrics.getDurationInMs(timerContext.stop());
-      metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
-      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
-          + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
-          + " cleanerElapsedMs" + durationMs);
+
+    HoodieCleanMetadata metadata = null;
+    HoodieTable table = createTable(config, hadoopConf);
+    if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
+      LOG.info("Cleaner started");
+      // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
+      if (scheduleInline) {
+        scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
+        table.getMetaClient().reloadActiveTimeline();
+      }
+
+      metadata = table.clean(context, cleanInstantTime, skipLocking);
+      if (timerContext != null && metadata != null) {
+        long durationMs = metrics.getDurationInMs(timerContext.stop());
+        metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
+        LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+            + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+            + " cleanerElapsedMs" + durationMs);
+      }
     }
     return metadata;
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 130d379..0aac930 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -278,6 +278,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("The average record size. If not explicitly specified, hudi will compute the "
           + "record size estimate compute dynamically based on commit metadata. "
           + " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
+  
+  public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
+      .key("hoodie.clean.allow.multiple")
+      .defaultValue(true)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, "
+          + ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config.");
 
   public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
       .key("hoodie.archive.merge.files.batch.size")
@@ -642,6 +649,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) {
+      compactionConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules));
+      return this;
+    }
+
     public Builder withCleanerParallelism(int cleanerParallelism) {
       compactionConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b41da7f..c7f2c45 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1117,6 +1117,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
   }
 
+  public boolean allowMultipleCleans() {
+    return getBoolean(HoodieCompactionConfig.ALLOW_MULTIPLE_CLEANS);
+  }
+
   public boolean shouldAutoTuneInsertSplits() {
     return getBoolean(HoodieCompactionConfig.COPY_ON_WRITE_AUTO_SPLIT_INSERTS);
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 221f970..f893b4c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -65,7 +65,7 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
    * Writes clean metadata to table metadata.
    * @param metadata clean metadata of interest.
    */
-  protected final void writeTableMetadata(HoodieCleanMetadata metadata) {
+  protected final void writeTableMetadata(HoodieCleanMetadata metadata, String instantTime) {
     table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, instantTime));
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 9813b2b..4ae8009 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -208,7 +208,7 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
       if (!skipLocking) {
         this.txnManager.beginTransaction(Option.empty(), Option.empty());
       }
-      writeTableMetadata(metadata);
+      writeTableMetadata(metadata, inflightInstant.getTimestamp());
       table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
           TimelineMetadataUtils.serializeCleanMetadata(metadata));
       LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
@@ -240,9 +240,13 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
             LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
           }
         }
+        table.getMetaClient().reloadActiveTimeline();
+        if (config.isMetadataTableEnabled()) {
+          table.getHoodieView().sync();
+        }
       });
-      table.getMetaClient().reloadActiveTimeline();
     }
+
     // return the last clean metadata for now
     // TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
     // This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index bec0b08..f51a169 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -122,6 +122,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -254,6 +255,73 @@ public class TestCleaner extends HoodieClientTestBase {
         SparkRDDWriteClient::upsertPreppedRecords, true);
   }
 
+
+  /**
+   * Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled.
+   */
+  @Test
+  public void testMultiClean() {
+    HoodieWriteConfig writeConfig = getConfigBuilder()
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+            .allowMultipleCleans(false)
+            .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+        .withEmbeddedTimelineServerEnabled(false).build();
+
+    int index = 0;
+    String cleanInstantTime;
+    final String partition = "2015/03/16";
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) {
+      // Three writes so we can initiate a clean
+      for (; index < 3; ++index) {
+        String newCommitTime = "00" + index;
+        List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition);
+        client.startCommitWithTime(newCommitTime);
+        client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+      }
+    }
+
+    // mimic failed/leftover clean by scheduling a clean but not performing it
+    cleanInstantTime = "00" + index++;
+    HoodieTable table = HoodieSparkTable.create(writeConfig, context);
+    Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context, cleanInstantTime, Option.empty());
+    assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition).size(), 1);
+    assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().countInstants(), 1);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) {
+      // Next commit. This is required so that there is an additional file version to clean.
+      String newCommitTime = "00" + index++;
+      List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition);
+      client.startCommitWithTime(newCommitTime);
+      client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+      // Initiate another clean. The previous leftover clean will be attempted first, followed by another clean
+      // due to the commit above.
+      String newCleanInstantTime = "00" + index++;
+      HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime);
+      // subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
+      assertNull(cleanMetadata);
+
+      // let the old clean complete
+      table = HoodieSparkTable.create(writeConfig, context);
+      cleanMetadata = table.clean(context, cleanInstantTime, false);
+      assertNotNull(cleanMetadata);
+
+      // any new clean should go ahead
+      cleanMetadata = client.clean(newCleanInstantTime);
+      // subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
+      assertNotNull(cleanMetadata);
+
+      // 1 file cleaned
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0);
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);
+    }
+  }
+
   /**
    * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
    *
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 9f63bfa..a3a1305 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -35,6 +35,9 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV1Mig
 import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler;
 import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
 
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +46,9 @@ import java.util.Map;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 
 public class CleanerUtils {
+
+  private static final Logger LOG = LogManager.getLogger(CleanerUtils.class);
+
   public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION;
   public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION;
   public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2;
@@ -131,6 +137,7 @@ public class CleanerUtils {
           // No need to do any special cleanup for failed operations during clean
           return;
         } else if (cleaningPolicy.isLazy()) {
+          LOG.info("Cleaned failed attempts if any");
           // Perform rollback of failed operations for all types of actions during clean
           rollbackFailedWritesFunc.apply();
           return;
@@ -140,6 +147,7 @@ public class CleanerUtils {
       case COMMIT_ACTION:
         // For any other actions, perform rollback of failed writes
         if (cleaningPolicy.isEager()) {
+          LOG.info("Cleaned failed attempts if any");
           rollbackFailedWritesFunc.apply();
           return;
         }