You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/12 11:50:24 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #5269: [HUDI-3636] Create new write clients for async table services in DeltaStreamer

xushiyan commented on code in PR #5269:
URL: https://github.com/apache/hudi/pull/5269#discussion_r993354807


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java:
##########
@@ -39,8 +39,9 @@ public BaseCompactor(BaseHoodieWriteClient<T, I, K, O> compactionClient) {
 
   public abstract void compact(HoodieInstant instant) throws IOException;
 
-  public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) {
-    this.compactionClient = writeClient;
+  public void close() {

Review Comment:
   ditto



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java:
##########
@@ -40,16 +40,15 @@ public BaseClusterer(BaseHoodieWriteClient<T, I, K, O> clusteringClient) {
 
   /**
    * Run clustering for the instant.
+   *
    * @param instant
    * @throws IOException
    */
   public abstract void cluster(HoodieInstant instant) throws IOException;
 
-  /**
-   * Update the write client used by async clustering.
-   * @param writeClient
-   */
-  public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) {
-    this.clusteringClient = writeClient;
+  public void close() {
+    if (clusteringClient != null) {
+      clusteringClient.close();
+    }

Review Comment:
   let's implement AutoCloseable when we have this close() API



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java:
##########
@@ -78,13 +81,26 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
 
         while (!isShutdownRequested()) {
           final HoodieInstant instant = fetchNextAsyncServiceInstant();
-
           if (null != instant) {
             LOG.info("Starting Compaction for instant " + instant);
+            synchronized (writeConfigUpdateLock) {

Review Comment:
   from my understanding of previous comments, this still allows async service to receive write config updates? if we think about async service coupling with a table service client and being immutable, we shouldn't need this sort of lock. we just create new async service at deltasync level, and freeze write config for the service internally.



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -190,7 +192,49 @@ public static HoodieWriteConfig createHoodieConfig(String schemaStr, String base
 
   public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
                                                        String tblName, Map<String, String> parameters) {
-    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters));
+    return createHoodieClient(jssc, schemaStr, basePath, tblName, parameters, Option.empty());
+  }
+
+  public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
+                                                       String tblName, Map<String, String> parameters, Option<EmbeddedTimelineServiceHandler> embeddedTimelineServiceHandler) {
+    HoodieWriteConfig writeConfig = createHoodieConfig(schemaStr, basePath, tblName, parameters);
+    if (embeddedTimelineServiceHandler.isPresent())  {
+      embeddedTimelineServiceHandler.get().onInstantiation(writeConfig);
+    }
+    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), writeConfig, embeddedTimelineServiceHandler.isPresent()
+        ? embeddedTimelineServiceHandler.get().getEmbeddedTimelineService() : Option.empty());
+  }
+
+  /**
+   * For spark structured streaming ingestion, embedded timeline service is instantiated externally and re-used across various write client instantiations (regular writer, table services).
+   * This class helps in coordinating the instantiation of embedded timeline service and exposes the singleton instance.
+   */
+  public static class EmbeddedTimelineServiceHandler {
+
+    private Option<EmbeddedTimelineService> embeddedTimelineService = Option.empty();
+    private final JavaSparkContext jssc;
+
+    public EmbeddedTimelineServiceHandler(JavaSparkContext jssc) {
+      this.jssc = jssc;
+    }
+
+    public void onInstantiation(HoodieWriteConfig writeConfig) {
+      if (writeConfig.isEmbeddedTimelineServerEnabled()) {
+        if (!embeddedTimelineService.isPresent()) {
+          try {
+            embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new HoodieSparkEngineContext(jssc), writeConfig);
+          } catch (IOException e) {
+            e.printStackTrace();

Review Comment:
   avoid e.print



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org