You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/28 07:30:22 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #1577: [HUDI-855] Run Auto Cleaner in parallel with ingestion

vinothchandar commented on a change in pull request #1577:
URL: https://github.com/apache/hudi/pull/1577#discussion_r446606888



##########
File path: hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
##########
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.deltastreamer;
+package org.apache.hudi.async;

Review comment:
       org.apache.hudi.client.service is a better location? we should have this, the timeline server etc in a single place?

##########
File path: hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
##########
@@ -32,11 +32,11 @@
 import java.util.function.Function;
 
 /**
- * Base Class for running delta-sync/compaction in separate thread and controlling their life-cycle.
+ * Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle.
  */
-public abstract class AbstractDeltaStreamerService implements Serializable {
+public abstract class AbstractAsyncService implements Serializable {

Review comment:
       we ideally should be reusing the same for #1752 ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -254,6 +260,7 @@ public static SparkConf registerClasses(SparkConf conf) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT);
+    AutoCleanerService.spawnAutoCleanerIfEnabled(this, instantTime);

Review comment:
       how come the return value is never set to the instance variable?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AutoCleanerService.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Auto Clean service running concurrently with write operation.
+ */
+class AutoCleanerService extends AbstractAsyncService {
+
+  private static final Logger LOG = LogManager.getLogger(AutoCleanerService.class);
+
+  private final HoodieWriteClient writeClient;
+  private final String cleanInstant;
+  private final transient ExecutorService executor = Executors.newFixedThreadPool(1);
+
+  protected AutoCleanerService(HoodieWriteClient writeClient, String cleanInstant) {
+    this.writeClient = writeClient;
+    this.cleanInstant = cleanInstant;
+  }
+
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {

Review comment:
       This class feels more like a `AsyncTask` , rather than a service.. i.e something that is long running and accepts tasks.. can we file a follow on to clean this code uop?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -81,6 +81,8 @@
   private final transient HoodieMetrics metrics;
   private transient Timer.Context compactionTimer;
 
+  private transient AutoCleanerService autoCleanerService;

Review comment:
       auto or not, is a configuration that should not be leaked into class names.

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -338,15 +346,27 @@ protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
       // We cannot have unbounded commit files. Archive commits if we have to archive
       HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
       archiveLog.archiveIfRequired(hadoopConf);
-      if (config.isAutoClean()) {
-        // Call clean to cleanup if there is anything to cleanup after the commit,
+      autoCleanOnCommit(instantTime);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Handle auto clean during commit.
+   * @param instantTime
+   */
+  private void autoCleanOnCommit(String instantTime) {
+    if (config.isAutoClean()) {
+      // Call clean to cleanup if there is anything to cleanup after the commit,
+      if (config.isRunParallelAutoClean()) {

Review comment:
       there are a lot of terms being overloaded here -- parallel, async, auto.. what you mean by parallel is async.. correct?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -477,6 +497,8 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws H
    */
   @Override
   public void close() {
+    AutoCleanerService.shutdownAutoCleaner(autoCleanerService);
+    autoCleanerService = null;

Review comment:
       this kind of resetting is probably needed after each write operation as well? may be its fine to just reinitialize the service after waitForCompletion.. food for thought.. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -81,6 +81,8 @@
   private final transient HoodieMetrics metrics;
   private transient Timer.Context compactionTimer;
 
+  private transient AutoCleanerService autoCleanerService;

Review comment:
       just rename to `AsyncCleanerService` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org