You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2019/11/05 17:01:31 UTC

[GitHub] [incubator-hudi] bvaradar commented on a change in pull request #942: [HUDI-137] Fix state transitions for Hudi cleaning action

bvaradar commented on a change in pull request #942: [HUDI-137] Fix state transitions for Hudi cleaning action
URL: https://github.com/apache/incubator-hudi/pull/942#discussion_r342680405
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -1002,19 +1004,98 @@ public void clean() throws HoodieIOException {
    * Clean up any stale/old files/data lying around (either on file storage or index storage) based
    * on the configurations and CleaningPolicy used. (typically files that no longer can be used by a
    * running query can be cleaned)
+   *
+   * @param startCleanTime  Cleaner Instant Timestamp
+   * @return
+   * @throws HoodieIOException in case of any IOException
    */
-  private void clean(String startCleanTime) throws HoodieIOException {
+  protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+
+    // If there are inflight(failed) or previously requested clean operation, first perform them
+    table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
+      logger.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
+      runClean(hoodieInstant.getTimestamp());
+    });
+
+    Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
+
+    if (cleanerPlanOpt.isPresent()) {
+      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
+      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
+          && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
+        return runClean(startCleanTime);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file
+   * @param startCleanTime   Cleaner Instant Time
+   * @return Cleaner Plan if generated
+   */
+  @VisibleForTesting
+  protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable<T> table = HoodieTable.getHoodieTable(
+        createMetaClient(true), config, jsc);
+
+    HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
+
+    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
+        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
+
+      HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
+      // Save to both aux and timeline folder
+      try {
+        table.getActiveTimeline()
+            .saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
+        logger.info("Requesting Cleaning with instant time " + cleanInstant);
+      } catch (IOException e) {
+        logger.error("Got exception when saving cleaner requested file", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      return Option.of(cleanerPlan);
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Executes the Cleaner plan stored in the instant metadata
+   * @param cleanInstantTs  Cleaner Instant Timestamp
+   * @return
+   */
+  @VisibleForTesting
+  protected HoodieCleanMetadata runClean(String cleanInstantTs) {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable<T> table = HoodieTable.getHoodieTable(
+        createMetaClient(false), config, jsc);
+
+    HoodieInstant cleanInstant =
+        table.getCleanTimeline().getInstants().filter(x -> x.getTimestamp().equals(cleanInstantTs)).findFirst().get();
+
+    Preconditions.checkArgument(cleanInstant.getState().equals(State.REQUESTED)
+        || cleanInstant.getState().equals(State.INFLIGHT));
+
+
+    if (cleanInstant.isInflight()) {
+      cleanInstant = table.getActiveTimeline().revertToRequested(cleanInstant);
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services