You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/18 00:37:59 UTC

[GitHub] [hudi] danny0405 commented on a diff in pull request #5890: [HUDI-4273] Support inline schedule clustering for Flink stream

danny0405 commented on code in PR #5890:
URL: https://github.com/apache/hudi/pull/5890#discussion_r899722284


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -594,6 +594,12 @@ private FlinkOptions() {
       .defaultValue(false) // default false for pipeline
       .withDescription("Schedule the cluster plan, default false");
 
+  public static final ConfigOption<Boolean> CLUSTERING_ASYNC_ENABLED = ConfigOptions
+      .key("clustering.async.enabled")
+      .booleanType()
+      .defaultValue(false) // default false for pipeline
+      .withDescription("Async Clustering, default fals");
+

Review Comment:
   fals -> false



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java:
##########
@@ -234,6 +234,7 @@ private void testWriteToHoodie(
       Pipelines.clean(conf, pipeline);
       Pipelines.compact(conf, pipeline);
     }
+    Pipelines.cluster(conf, rowType, pipeline);
     JobClient client = execEnv.executeAsync(jobName);

Review Comment:
   Revert this line and add a new test for INSERT mode and async clustering.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -253,6 +253,14 @@ public void notifyCheckpointComplete(long checkpointId) {
             CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
           }
 
+          if (tableState.scheduleClustering && committed) {
+            // if async clustering is on, schedule the clustering
+            if (OptionsResolver.isBucketIndexType(conf)) {
+              throw new UnsupportedOperationException("Bucket index not supported for clustering.");
+            }

Review Comment:
   `Bucket index not supported` -> `Bucket index is not supported`,
   
   And can we move the code block:
   
   ```java
   if (xxx) {
   }
   writeClient.scheduleClustering
   ```
   
   into a tool clazz like ClusteringUtils.scheduleClustering(writeClient, committed), just like what we do to compaction
   scheduling.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -650,6 +656,12 @@ private FlinkOptions() {
       .defaultValue(30)
       .withDescription("Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism, default is 30");
 
+  public static final ConfigOption<Integer> CLUSTERING_TIMEOUT_SECONDS = ConfigOptions
+      .key("clustering.timeout.seconds")
+      .intType()
+      .defaultValue(1200) // default 20 minutes

Review Comment:
   The option can be removed because it is never used.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java:
##########
@@ -100,15 +104,32 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
   private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
   private transient HoodieFlinkWriteClient writeClient;
   private transient BulkInsertWriterHelper writerHelper;
-  private transient String instantTime;
 
   private transient BinaryExternalSorter sorter;
   private transient StreamRecordCollector<ClusteringCommitEvent> collector;
   private transient BinaryRowDataSerializer binarySerializer;
 
+  /**
+   * Whether to execute compaction asynchronously.
+   */
+  private final boolean asyncClustering;
+
+  /**
+   * Executor service to execute the clustering task.
+   */
+  private transient NonThrownExecutor executor;
+
+  /**
+   * Flag saying whether the schema is initialized,
+   * the table may be empty when start up, in order to fetch the latest schema,
+   * initializes the schema lazily.
+   */
+  private boolean initialized = false;

Review Comment:
   Is it necessary to be lazy initialized ? Based on the fact that we already passed in the `rowType` on construction, only static schema is supported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org