You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/20 03:58:57 UTC

[GitHub] [hudi] watermelon12138 opened a new pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

watermelon12138 opened a new pull request #4645:
URL: https://github.com/apache/hudi/pull/4645


   ## What is the purpose of the pull request
   Enable MultiTableDeltaStreamer to update a single target table from multiple source tables.
   ## Brief change log
     - *Modify the HoodieMultiTableDeltaStreamer file so that it can generate the execution context of table based on source tables.*
     - *Modify the DeltaSync.java file so that the source table can associate with other tables and the source can configure independent checkpoint.*
     - *add UT.*


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1031937892


   @watermelon12138 : Once hte feedback has been addressed, can you let me know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612494



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[0] : "default";
+      String targetTable = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[1] : targetTableWithDataBase[0];

Review comment:
       ditto, duplicate at L141




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1018115438


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1044572661


   hey @watermelon12138 : gentle ping on the patch. we might have a cut off data for 0.11 in 3 weeks ish. Just wanted to keep you informed. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612445



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[0] : "default";

Review comment:
       May be we can extract this line as a separate function, this is duplicated at L140 and L127 in your code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017487585


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054053920


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6381",
       "triggerID" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   * d98dd23ee2077ff5c70d7c1f57a89057319850fd Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6381) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810615952



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = "hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+
     private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+
     private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
+
     private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
+
+    // This configuration item specifies the database name and table name of the source. The format is "database.table".
+    // It is recommended that table name be the same as the alias of the source. If there are multiple sources, separate them with commas.
+    public static final String SOURCES_TO_BE_BOUND = "hoodie.deltastreamer.source.sourcesToBeBound";
+
+    private static final String SOURCE_PREFIX = "hoodie.deltastreamer.source.";
+
     private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
+
     private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
+
     private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
+
     private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
+
     private static final String LOCAL_SPARK_MASTER = "local[2]";
-    private static final String FILE_DELIMITER = "/";
-    private static final String DELIMITER = ".";
+
+    public static final String PATH_SEPARATOR = "/";
+
+    public static final String PATH_CUR_DIR = ".";

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612705



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);

Review comment:
       We can then change the function name to `getSourcesToFetchFrom` or similar




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017556536


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1018115438


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1018234112


   @nsivabalan
   Ok, Thank you very much. These are some very good advice and I will try to land them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017099833


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017484982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017128365


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810615917



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = "hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+
     private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+
     private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
+
     private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
+
+    // This configuration item specifies the database name and table name of the source. The format is "database.table".
+    // It is recommended that table name be the same as the alias of the source. If there are multiple sources, separate them with commas.
+    public static final String SOURCES_TO_BE_BOUND = "hoodie.deltastreamer.source.sourcesToBeBound";

Review comment:
       this variable can also be changed like I suggested in a previous comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054051103


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   * d98dd23ee2077ff5c70d7c1f57a89057319850fd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017128365


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017098255


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612142



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[0] : "default";
+      String targetTable = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[1] : targetTableWithDataBase[0];
+      String targetBasePath = resetTarget(config, targetDataBase, targetTable);
+
+      for (String source : sourcesToBeBonded) {
+        String[] tableWithDatabase = source.split("\\.");
+        String currentSourceDataBase = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
+        String currentSourceTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : source;
+        String configProp = Constants.SOURCE_PREFIX + currentSourceDataBase + Constants.PATH_CUR_DIR + currentSourceTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = populateTableExecutionContext(properties, configFolder, fs, config, configProp, currentSourceDataBase, currentSourceTable, targetBasePath);
+        this.tableExecutionContexts.add(executionContext);
+      }
+    } else {
+      // populate the table execution context by traversing target tables
+      List<String> tablesToBeIngested = getTablesToBeIngested(properties);
+      logger.info("Tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
+
+      for (String table : tablesToBeIngested) {
+        String[] tableWithDatabase = table.split("\\.");
+        String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
+        String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
+        String configProp = Constants.INGESTION_PREFIX + database + Constants.PATH_CUR_DIR + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = populateTableExecutionContext(properties, configFolder, fs, config, configProp, database, currentTable, null);
+        this.tableExecutionContexts.add(executionContext);
       }
-      populateSchemaProviderProps(cfg, tableProperties);
-      executionContext = new TableExecutionContext();
-      executionContext.setProperties(tableProperties);
-      executionContext.setConfig(cfg);
-      executionContext.setDatabase(database);
-      executionContext.setTableName(currentTable);
-      this.tableExecutionContexts.add(executionContext);
     }
   }
 
+  private TableExecutionContext populateTableExecutionContext(TypedProperties properties, String configFolder,
+      FileSystem fs, Config config, String configProp, String database, String currentTable, String targetBasePath) throws IOException {
+    // copy all common properties to current table properties
+    TypedProperties currentTableProperties = getCurrentTableProperties(properties, configFolder, fs, configProp, database,
+        currentTable);
+
+    // copy all the values from config to cfg
+    final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+    Helpers.deepCopyConfigs(config, cfg);
+
+    // calculate the value of targetBasePath which is a property of cfg
+    calculateTargetBasePath(config, database, currentTable, targetBasePath, currentTableProperties, cfg);
+
+    if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(currentTableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key()))) {
+      throw new HoodieException("Hive sync table field not provided!");
+    }
+
+    populateSchemaProviderProps(cfg, currentTableProperties);
+    TableExecutionContext executionContext = new TableExecutionContext();
+    executionContext.setProperties(currentTableProperties);
+    executionContext.setConfig(cfg);
+    executionContext.setDatabase(database);
+    executionContext.setTableName(currentTable);
+    return executionContext;
+  }
+
+  private TypedProperties getCurrentTableProperties(TypedProperties properties, String configFolder, FileSystem fs,
+      String configProp, String database, String currentTable) throws IOException {
+
+    String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
+    checkIfTableConfigFileExists(configFolder, fs, configFilePath);
+
+    TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<>()).getProps();
+    properties.forEach((k, v) -> {
+      if (tableProperties.get(k) == null) {
+        tableProperties.setProperty(k.toString(), v.toString());
+      }
+    });
+
+    return tableProperties;
+  }
+
+  private void calculateTargetBasePath(Config config, String database, String currentTable, String targetBasePath,
+      TypedProperties currentTableProperties, HoodieDeltaStreamer.Config cfg) {
+
+    String overriddenTargetBasePath = currentTableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
+
+    if (StringUtils.isNullOrEmpty(targetBasePath)) {
+      targetBasePath = resetTarget(config, database, currentTable);
+    }
+
+    cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath)
+      ? targetBasePath
+      : overriddenTargetBasePath;
+  }
+
+  private List<String> getSourcesToBeBound(TypedProperties properties) {
+    String combinedSourcesString = properties.getString(Constants.SOURCES_TO_BE_BOUND, null);
+    return StringUtils.isNullOrEmpty(combinedSourcesString)
+      ? new ArrayList<>()
+      : Arrays.asList(combinedSourcesString.split(Constants.COMMA_SEPARATOR));
+  }
+
+  private String getTableToBeIngested(TypedProperties properties) {

Review comment:
       what is the need of introducing this new function? Can we reuse existing `getTablesToBeIngested` in your new code as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810614920



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);

Review comment:
       I did not understand the reason behind having this do..while loop based on `isContinuousMode`. Can you please elaborate what exactly are you trying to do here? @watermelon12138 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054073675


   @nsivabalan @pratyakshsharma
   This is new PR address.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054073152


   @nsivabalan @pratyakshsharma
   Thank you for your advice. I've changed the corresponding content in the code. Because my PR is too late for current version, and many conflicts occur during merge, I have difficulty resolving them. Therefore, I have created a new PR. I hope you can give suggestions in the new PR. Thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054053920


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6381",
       "triggerID" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   * d98dd23ee2077ff5c70d7c1f57a89057319850fd Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6381) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1046228405


   @watermelon12138 thank you for contributing this useful feature. Few comments, please address them and rebase with master. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r789365060



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.

Review comment:
       probably we can have a big if else  blocks for single source vs multiple sources for one hudi table. would be easy to reason about and maintain. 
   existing code will go into if block and new code for multiple source will go into else block. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;

Review comment:
       I guess we need to move this to L488 as 
   ```
   boolean isContinuousMode = hdsObjectList.get(i).cfg.continuousMode;
   ```
   essentially we can't have continuous mode enabled for any tables right.
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";

Review comment:
       Do you think we can move these configs to one of the DeltaStreamer class. The way I look at this patch is, we are adding support to ingest data from multiple sources to a single hudi table. So, these configs probably fit well in deltastreamer. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = "hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+
     private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+
     private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
+
     private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
+
+    // This configuration item specifies the database name and table name of the source. The format is "database.table".
+    // It is recommended that table name be the same as the alias of the source. If there are multiple sources, separate them with commas.
+    public static final String SOURCES_TO_BE_BOUND = "hoodie.deltastreamer.source.sourcesToBeBound";
+
+    private static final String SOURCE_PREFIX = "hoodie.deltastreamer.source.";
+
     private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
+
     private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
+
     private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
+
     private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
+
     private static final String LOCAL_SPARK_MASTER = "local[2]";
-    private static final String FILE_DELIMITER = "/";
-    private static final String DELIMITER = ".";
+
+    public static final String PATH_SEPARATOR = "/";
+
+    public static final String PATH_CUR_DIR = ".";

Review comment:
       I would prefer to keep this as "DELIMITER". 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017487585


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1046228405


   @watermelon12138 thank you for contributing this useful feature. Few comments, please address them and rebase with master. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810614586



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r815720738



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = "hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+
     private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+
     private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
+
     private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
+
+    // This configuration item specifies the database name and table name of the source. The format is "database.table".
+    // It is recommended that table name be the same as the alias of the source. If there are multiple sources, separate them with commas.
+    public static final String SOURCES_TO_BE_BOUND = "hoodie.deltastreamer.source.sourcesToBeBound";
+
+    private static final String SOURCE_PREFIX = "hoodie.deltastreamer.source.";
+
     private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
+
     private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
+
     private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
+
     private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
+
     private static final String LOCAL_SPARK_MASTER = "local[2]";
-    private static final String FILE_DELIMITER = "/";
-    private static final String DELIMITER = ".";
+
+    public static final String PATH_SEPARATOR = "/";
+
+    public static final String PATH_CUR_DIR = ".";

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[0] : "default";
+      String targetTable = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[1] : targetTableWithDataBase[0];
+      String targetBasePath = resetTarget(config, targetDataBase, targetTable);
+
+      for (String source : sourcesToBeBonded) {
+        String[] tableWithDatabase = source.split("\\.");
+        String currentSourceDataBase = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
+        String currentSourceTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : source;
+        String configProp = Constants.SOURCE_PREFIX + currentSourceDataBase + Constants.PATH_CUR_DIR + currentSourceTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = populateTableExecutionContext(properties, configFolder, fs, config, configProp, currentSourceDataBase, currentSourceTable, targetBasePath);
+        this.tableExecutionContexts.add(executionContext);
+      }
+    } else {
+      // populate the table execution context by traversing target tables
+      List<String> tablesToBeIngested = getTablesToBeIngested(properties);
+      logger.info("Tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
+
+      for (String table : tablesToBeIngested) {
+        String[] tableWithDatabase = table.split("\\.");
+        String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
+        String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
+        String configProp = Constants.INGESTION_PREFIX + database + Constants.PATH_CUR_DIR + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = populateTableExecutionContext(properties, configFolder, fs, config, configProp, database, currentTable, null);
+        this.tableExecutionContexts.add(executionContext);
       }
-      populateSchemaProviderProps(cfg, tableProperties);
-      executionContext = new TableExecutionContext();
-      executionContext.setProperties(tableProperties);
-      executionContext.setConfig(cfg);
-      executionContext.setDatabase(database);
-      executionContext.setTableName(currentTable);
-      this.tableExecutionContexts.add(executionContext);
     }
   }
 
+  private TableExecutionContext populateTableExecutionContext(TypedProperties properties, String configFolder,
+      FileSystem fs, Config config, String configProp, String database, String currentTable, String targetBasePath) throws IOException {
+    // copy all common properties to current table properties
+    TypedProperties currentTableProperties = getCurrentTableProperties(properties, configFolder, fs, configProp, database,
+        currentTable);
+
+    // copy all the values from config to cfg
+    final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+    Helpers.deepCopyConfigs(config, cfg);
+
+    // calculate the value of targetBasePath which is a property of cfg
+    calculateTargetBasePath(config, database, currentTable, targetBasePath, currentTableProperties, cfg);
+
+    if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(currentTableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key()))) {
+      throw new HoodieException("Hive sync table field not provided!");
+    }
+
+    populateSchemaProviderProps(cfg, currentTableProperties);
+    TableExecutionContext executionContext = new TableExecutionContext();
+    executionContext.setProperties(currentTableProperties);
+    executionContext.setConfig(cfg);
+    executionContext.setDatabase(database);
+    executionContext.setTableName(currentTable);
+    return executionContext;
+  }
+
+  private TypedProperties getCurrentTableProperties(TypedProperties properties, String configFolder, FileSystem fs,
+      String configProp, String database, String currentTable) throws IOException {
+
+    String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
+    checkIfTableConfigFileExists(configFolder, fs, configFilePath);
+
+    TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<>()).getProps();
+    properties.forEach((k, v) -> {
+      if (tableProperties.get(k) == null) {
+        tableProperties.setProperty(k.toString(), v.toString());
+      }
+    });
+
+    return tableProperties;
+  }
+
+  private void calculateTargetBasePath(Config config, String database, String currentTable, String targetBasePath,
+      TypedProperties currentTableProperties, HoodieDeltaStreamer.Config cfg) {
+
+    String overriddenTargetBasePath = currentTableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
+
+    if (StringUtils.isNullOrEmpty(targetBasePath)) {
+      targetBasePath = resetTarget(config, database, currentTable);
+    }
+
+    cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath)
+      ? targetBasePath
+      : overriddenTargetBasePath;
+  }
+
+  private List<String> getSourcesToBeBound(TypedProperties properties) {
+    String combinedSourcesString = properties.getString(Constants.SOURCES_TO_BE_BOUND, null);
+    return StringUtils.isNullOrEmpty(combinedSourcesString)
+      ? new ArrayList<>()
+      : Arrays.asList(combinedSourcesString.split(Constants.COMMA_SEPARATOR));
+  }
+
+  private String getTableToBeIngested(TypedProperties properties) {

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[0] : "default";

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = "hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = "hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+
     private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+
     private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
+
     private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
+
+    // This configuration item specifies the database name and table name of the source. The format is "database.table".
+    // It is recommended that table name be the same as the alias of the source. If there are multiple sources, separate them with commas.
+    public static final String SOURCES_TO_BE_BOUND = "hoodie.deltastreamer.source.sourcesToBeBound";

Review comment:
       Thank you for your advice. I have changed corresponding content in the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1018114106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1018114106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810615799



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";
+
+    // If there are multiple sources, you can use this configuration item to set an alias for the source to distinguish the source.
+    // In addition, the alias is used as a suffix to distinguish the CHECKPOINT_KEY and CHECKPOINT_RESET_KEY of each source.
+    public static final String SOURCE_NAME = "hoodie.deltastreamer.current.source.name";
+
     public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
+
     private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+
     private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+
     public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
+
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+

Review comment:
       can we remove these extra empty lines? This is not recommended from our checkstyle as far as I know. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612445



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[0] : "default";

Review comment:
       May be we can extract this line as a separate function, this is duplicated at L140 in your code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810617803



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
##########
@@ -177,6 +182,73 @@ public void testMultiTableExecutionWithKafkaSource() throws IOException {
     testNum++;
   }
 
+  @Test //0 corresponds to fg

Review comment:
       what is fg?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054123412


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6381",
       "triggerID" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d98dd23ee2077ff5c70d7c1f57a89057319850fd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=6381) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma closed pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma closed pull request #4645:
URL: https://github.com/apache/hudi/pull/4645


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612494



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[0] : "default";
+      String targetTable = targetTableWithDataBase.length > 1 ? targetTableWithDataBase[1] : targetTableWithDataBase[0];

Review comment:
       ditto, duplicate at L141 and L128




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810615551



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);

Review comment:
       from what I understand, you want to do the ingestion for tables which are bound to multiple source tables here. I guess that can be done using a simple for loop. That will make it more simple and eliminate the need to `isContinuousMode` variable. 
   
   Since MultiTableDeltaStreamer does serial execution, we can simply set the continuousMode as false for every table. LMK what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017098255


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017099833


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r815721725



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
##########
@@ -177,6 +182,73 @@ public void testMultiTableExecutionWithKafkaSource() throws IOException {
     testNum++;
   }
 
+  @Test //0 corresponds to fg

Review comment:
       I have deleted it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054571187


   Closing in favour of https://github.com/apache/hudi/pull/4925


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017556536


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1018113861


   @hudi-bot run azure re-run the last Azure build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target …

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1017484982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5332458bfb61a6e13b9b59ae3813d236f86e01da Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358) 
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612662



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file mapping
+  // commonProps are passed as parameter which contain table to config file mapping
   private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
-      String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update the same target.
+    String sourcesToBeBound = properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);

Review comment:
       I was wondering if we can change this variable name to something more proper like `sourcesToFetchFrom` or something. However I would leave the final decision to you. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054051103


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5358",
       "triggerID" : "5332458bfb61a6e13b9b59ae3813d236f86e01da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ee9f2eaa28c5836977ea980a1d50b1d65ce342ef",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380",
       "triggerID" : "1018113861",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d98dd23ee2077ff5c70d7c1f57a89057319850fd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ee9f2eaa28c5836977ea980a1d50b1d65ce342ef Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=5380) 
   * d98dd23ee2077ff5c70d7c1f57a89057319850fd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on a change in pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r815719379



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -370,50 +441,124 @@ public static void main(String[] args) throws IOException {
   private static String resetTarget(Config configuration, String database, String tableName) {
     String basePathPrefix = configuration.basePathPrefix;
     basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
-    String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
-    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    String targetBasePath = basePathPrefix + Constants.PATH_SEPARATOR + database + Constants.PATH_SEPARATOR + tableName;
+    configuration.targetTableName = database + Constants.PATH_CUR_DIR + tableName;
     return targetBasePath;
   }
 
   /**
    * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
    */
   public void sync() {
+    List<HoodieDeltaStreamer> hdsObjectList = new ArrayList<>();
+
+    // The sync function is not executed when multiple sources update the same target.
     for (TableExecutionContext context : tableExecutionContexts) {
       try {
-        new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+        HoodieDeltaStreamer hds = new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+
+        // Add object of HoodieDeltaStreamer temporarily to hdsObjectList when multiple sources update the same target.
+        if (!StringUtils.isNullOrEmpty(context.getProperties().getProperty(Constants.SOURCES_TO_BE_BOUND))) {
+          hdsObjectList.add(hds);
+          continue;
+        }
+
+        hds.sync();
         successTables.add(Helpers.getTableWithDatabase(context));
       } catch (Exception e) {
-        logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
+        logger.error("Error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
         failedTables.add(Helpers.getTableWithDatabase(context));
       }
     }
 
-    logger.info("Ingestion was successful for topics: " + successTables);
-    if (!failedTables.isEmpty()) {
-      logger.info("Ingestion failed for topics: " + failedTables);
+    // If hdsObjectList is empty, it indicates that all source sync operations have been completed. In this case, directly return.
+    if (hdsObjectList.isEmpty()) {
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+      }
+      return;
     }
+
+    // The sync function is executing here when multiple sources update the same target.
+    boolean isContinuousMode = hdsObjectList.get(0).cfg.continuousMode;
+    do {
+      // Executing sync function by traversing hdsObjectList when multiple sources update the same target.
+      for (int i = 0; i < hdsObjectList.size(); i++) {
+        // Threads cannot be started when multiple sources update the same target.
+        if (isContinuousMode) {
+          hdsObjectList.get(i).cfg.continuousMode = false;
+        }
+
+        try {
+          hdsObjectList.get(i).sync();
+          successTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+        } catch (Exception e) {
+          logger.error("Error while running MultiTableDeltaStreamer for table: "
+              + tableExecutionContexts.get(i).getTableName(), e);
+          failedTables.add(Helpers.getTableWithDatabase(tableExecutionContexts.get(i)));
+          break;
+        }
+      }
+
+      logger.info("Ingestion was successful for topics: " + successTables);
+      if (!failedTables.isEmpty()) {
+        logger.info("Ingestion failed for topics: " + failedTables);
+        break;
+      }
+      successTables.clear();
+    } while (isContinuousMode);
   }
 
   public static class Constants {
+    // When there are multiple sources, you can use this configuration item to set an independent checkpoint for the source.
+    public static final String SOURCE_CHECKPOINT = "hoodie.deltastreamer.current.source.checkpoint";

Review comment:
       I can't move SOURCE_CHECKPOINT to deltastreamer because that MultiTableDeltaStreamer parses the configuration of source tables. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] watermelon12138 commented on pull request #4645: [HUDI-3103] Enable MultiTableDeltaStreamer to update a single target table from multiple source tables

Posted by GitBox <gi...@apache.org>.
watermelon12138 commented on pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#issuecomment-1054074075


   @nsivabalan @pratyakshsharma
   https://github.com/apache/hudi/pull/4925/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org