You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/28 05:19:17 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3142: [HUDI-1483] Support async clustering for deltastreamer and Spark streaming

nsivabalan commented on a change in pull request #3142:
URL: https://github.com/apache/hudi/pull/3142#discussion_r659456141



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+
+public abstract class AsyncClusteringService extends HoodieAsyncService {

Review comment:
       java docs would be nice. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+
+public abstract class AsyncClusteringService extends HoodieAsyncService {
+
+  private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
+
+  private final int maxConcurrentClustering;
+  private transient AbstractClusteringClient clusteringClient;
+  private transient BlockingQueue<HoodieInstant> pendingClustering = new LinkedBlockingQueue<>();
+  private transient ReentrantLock queueLock = new ReentrantLock();
+  private transient Condition consumed = queueLock.newCondition();
+
+  public AsyncClusteringService(AbstractHoodieWriteClient writeClient) {
+    this(writeClient, false);
+  }
+
+  public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean runInDaemonMode) {
+    super(runInDaemonMode);
+    this.clusteringClient = createClusteringClient(writeClient);
+    this.maxConcurrentClustering = 1;
+  }
+
+  protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client);
+
+  public void enqueuePendingClustering(HoodieInstant instant) {
+    LOG.info("Enqueuing new pending clustering instant: " + instant.getTimestamp());
+    pendingClustering.add(instant);
+  }
+
+  public void waitTillPendingClusteringReducesTo(int numPendingClustering) throws InterruptedException {
+    try {
+      queueLock.lock();
+      while (!isShutdown() && (pendingClustering.size() > numPendingClustering)) {
+        consumed.await();
+      }
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  private HoodieInstant fetchNextClusteringInstant() throws InterruptedException {
+    LOG.info("Waiting for next clustering instant for 10 seconds");
+    HoodieInstant instant = pendingClustering.poll(10, TimeUnit.SECONDS);
+    if (instant != null) {
+      try {
+        queueLock.lock();
+        // Signal waiting thread
+        consumed.signal();
+      } finally {
+        queueLock.unlock();
+      }
+    }
+    return instant;
+  }
+
+  /**
+   * Start clustering service.
+   */
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {
+    ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering,
+        r -> {
+          Thread t = new Thread(r, "async_clustering_thread");
+          t.setDaemon(isRunInDaemonMode());
+          return t;
+        });
+
+    return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
+      try {
+        while (!isShutdownRequested()) {
+          final HoodieInstant instant = fetchNextClusteringInstant();
+          if (null != instant) {
+            LOG.info("Starting clustering for instant " + instant);
+            clusteringClient.cluster(instant);
+            LOG.info("Finished clustering for instant " + instant);
+          }
+        }
+        LOG.info("Clustering executor shutting down properly");
+      } catch (InterruptedException ie) {
+        LOG.warn("Clustering executor got interrupted exception! Stopping", ie);
+      } catch (IOException e) {
+        LOG.error("Clustering executor failed", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      return true;
+    }, executor)).toArray(CompletableFuture[]::new)), executor);
+  }
+
+  public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) {

Review comment:
       I see lot of commonality between this and AsyncCompactionService. Can we please try to re-use code as much as possible? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -153,6 +155,11 @@ public Builder withInlineClusteringNumCommits(int numCommits) {
       return this;
     }
 
+    public Builder withAsyncClusteringNumCommits(int numCommits) {

Review comment:
       minor. withAsyncClustering**Max**commits

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -620,6 +641,17 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
+                                       parameters: Map[String, String]) : Boolean = {
+    log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
+    if (asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled

Review comment:
       can we return in 1 line. 
   asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled && parameters.get(ASYNC_CLUSTERING_ENABLE_OPT_KEY).exists(r => r.toBoolean)

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+
+public class HoodieSparkClusteringClient<T extends HoodieRecordPayload> extends

Review comment:
       java docs. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.HoodieSparkClusteringClient;
+
+public class SparkAsyncClusteringService extends AsyncClusteringService {

Review comment:
       java docs.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, I, K, O> implements Serializable {

Review comment:
       java docs please. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+  protected transient AbstractHoodieWriteClient<T, I, K, O> clusteringClient;
+
+  public AbstractClusteringClient(AbstractHoodieWriteClient<T, I, K, O> clusteringClient) {
+    this.clusteringClient = clusteringClient;
+  }
+
+  public abstract void cluster(HoodieInstant instant) throws IOException;

Review comment:
       Here also we can think of something like AsyncServiceClient. 
   may be we can have a common method as below. 
   ```
      public abstract void doAction(HoodieInstant instant) throw IOException;
   ```
   
   same abstract class for both clustering and compaction. 
   Not too strong on this suggestion though. Let's see what others have to say. 

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -583,13 +594,23 @@ object HoodieSparkSqlWriter {
 
       log.info(s"Compaction Scheduled is $compactionInstant")
 
+      val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
+      val clusteringInstant: common.util.Option[java.lang.String] =
+        if (asyncClusteringEnabled) {
+          client.scheduleClustering(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+        } else {
+          common.util.Option.empty()
+        }
+
+      log.info(s"Clustering Scheduled is $clusteringInstant")
+

Review comment:
       should we fix line 610 as well. 
   ```
   if(!asyncCompactionEnabled && !asyncClusteringEnabled) {
   ```

##########
File path: hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.HoodieSparkClusteringClient;
+
+public class SparkStreamingAsyncClusteringService extends AsyncClusteringService {

Review comment:
       java docs please. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -396,10 +424,10 @@ public int hashCode() {
               baseFileFormat, propsFilePath, configs, sourceClassName,
               sourceOrderingField, payloadClassName, schemaProviderClassName,
               transformerClassNames, sourceLimit, operation, filterDupes,
-              enableHiveSync, maxPendingCompactions, continuousMode,
+              enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode,
               minSyncIntervalSeconds, sparkMaster, commitOnErrors,
               deltaSyncSchedulingWeight, compactSchedulingWeight, deltaSyncSchedulingMinShare,
-              compactSchedulingMinShare, forceDisableCompaction, checkpoint,
+              compactSchedulingMinShare, forceDisableCompaction, forceDisableClustering, checkpoint,

Review comment:
       I see a config called compactSchedulingMinShare. Is there any necessity to create one for clustering? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+  protected transient AbstractHoodieWriteClient<T, I, K, O> clusteringClient;
+

Review comment:
       again, serialVersionUUID would be good. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+
+public abstract class AsyncClusteringService extends HoodieAsyncService {
+
+  private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
+

Review comment:
       can we add serialVersionUUID as well.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+  protected transient AbstractHoodieWriteClient<T, I, K, O> clusteringClient;
+
+  public AbstractClusteringClient(AbstractHoodieWriteClient<T, I, K, O> clusteringClient) {
+    this.clusteringClient = clusteringClient;
+  }
+
+  public abstract void cluster(HoodieInstant instant) throws IOException;

Review comment:
       also, lets try to add java docs for all public methods. I do understand that AbstractCompactor does not have java docs. Its fine. atleast for the code we write, lets try to add java docs. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -733,4 +743,15 @@ public Config getCfg() {
   public Option<HoodieTimeline> getCommitTimelineOpt() {
     return commitTimelineOpt;
   }
+
+  /**
+   * Schedule clustering.
+   * Called from {@link HoodieDeltaStreamer} when async clustering is enabled.
+   *
+   * @return Requested clustering instant.
+   */
+  public Option<String> getClusteringInstant() {

Review comment:
       minor. getClusteringInstant**Opt**()

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+
+public abstract class AsyncClusteringService extends HoodieAsyncService {

Review comment:
       minor. Can you fix the java docs in HoodieAsyncService to also add clustering. as of now, it looks like below.
   ```
   Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle.
   ```

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
##########
@@ -86,6 +93,11 @@ class HoodieStreamingSink(sqlContext: SQLContext,
             asyncCompactorService.enqueuePendingCompaction(
               new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
           }
+          if (clusteringInstant.isPresent) {
+            asyncClusteringService.enqueuePendingClustering(new HoodieInstant(
+              State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get()
+            ))
+          }

Review comment:
       in line 101, do we need to return clusteringInstant as well ?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -380,6 +380,10 @@ object DataSourceWriteOptions {
   val ASYNC_COMPACT_ENABLE_OPT_KEY = "hoodie.datasource.compaction.async.enable"
   val DEFAULT_ASYNC_COMPACT_ENABLE_OPT_VAL = "true"
 
+  // Async Clustering - Enabled by default
+  val ASYNC_CLUSTERING_ENABLE_OPT_KEY = "hoodie.datasource.clustering.async.enable"
+  val DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = "true"

Review comment:
       may I know why this is enabled by default? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org