You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/18 19:56:59 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5269: [HUDI-3636] Create new write clients for async table services in DeltaStreamer and Spark streaking sink

alexeykudinkin commented on code in PR #5269:
URL: https://github.com/apache/hudi/pull/5269#discussion_r998645726


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java:
##########
@@ -20,24 +20,37 @@
 package org.apache.hudi.async;
 
 import org.apache.hudi.client.RunsTableService;
+import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 public abstract class HoodieAsyncTableService extends HoodieAsyncService implements RunsTableService {
 
+  protected final Object writeConfigUpdateLock = new Object();
   protected HoodieWriteConfig writeConfig;
+  protected Option<EmbeddedTimelineService> embeddedTimelineService;
+  protected AtomicBoolean isWriteConfigUpdated = new AtomicBoolean(false);
 
   protected HoodieAsyncTableService() {
   }
 
-  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig) {
+  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> embeddedTimelineService) {
     this.writeConfig = writeConfig;
+    this.embeddedTimelineService = embeddedTimelineService;

Review Comment:
   Let's chain the ctors (so that we don't need to duplicate the init logic



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java:
##########
@@ -47,4 +60,11 @@ public void start(Function<Boolean, Boolean> onShutdownCallback) {
     }
     super.start(onShutdownCallback);
   }
+
+  public void updateWriteConfig(HoodieWriteConfig writeConfig) {
+    synchronized (writeConfigUpdateLock) {
+      this.writeConfig = EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(), writeConfig);
+      isWriteConfigUpdated.set(true);

Review Comment:
   Flag setup is quite brittle: it makes resetting a 2-step process (first update the flag, then update needs to be handled asynchronously), which is easy to miss during impl.
   
   Instead i'd suggest we 
    - Create abstract hook `handleWriteConfigUpdate` (hence make every inheritor implement it)
    - Invoke it here under lock (that way we keep the lock private and avoid this logic spilling into inheritors)
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java:
##########
@@ -20,24 +20,37 @@
 package org.apache.hudi.async;
 
 import org.apache.hudi.client.RunsTableService;
+import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 public abstract class HoodieAsyncTableService extends HoodieAsyncService implements RunsTableService {
 
+  protected final Object writeConfigUpdateLock = new Object();
   protected HoodieWriteConfig writeConfig;
+  protected Option<EmbeddedTimelineService> embeddedTimelineService;
+  protected AtomicBoolean isWriteConfigUpdated = new AtomicBoolean(false);
 
   protected HoodieAsyncTableService() {
   }
 
-  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig) {
+  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> embeddedTimelineService) {
     this.writeConfig = writeConfig;
+    this.embeddedTimelineService = embeddedTimelineService;
   }
 
-  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, boolean runInDaemonMode) {
+  protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> embeddedTimelineService, boolean runInDaemonMode) {
     super(runInDaemonMode);
-    this.writeConfig = writeConfig;
+    this.embeddedTimelineService = embeddedTimelineService;
+    if (embeddedTimelineService.isPresent()) {

Review Comment:
   Why the same logic isn't applied in the other ctor?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java:
##########
@@ -78,13 +81,26 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
 
         while (!isShutdownRequested()) {
           final HoodieInstant instant = fetchNextAsyncServiceInstant();
-
           if (null != instant) {
             LOG.info("Starting Compaction for instant " + instant);
+            synchronized (writeConfigUpdateLock) {

Review Comment:
   Please check my comment below regarding the flag



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org