You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/28 09:37:42 UTC

[GitHub] [hudi] watermelon12138 commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

watermelon12138 commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r815719379



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";

Review comment:
       I can't move SOURCE_CHECKPOINT to deltastreamer because that MultiTableDeltaStreamer parses the configuration of source tables. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org