You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by pr...@apache.org on 2022/05/31 14:57:58 UTC

[hudi] branch master updated: [HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597)

This is an automated email from the ASF dual-hosted git repository.

pratyakshsharma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 795a99ba73 [HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597)
795a99ba73 is described below

commit 795a99ba73183b5f7697bb3964843a43a64e74e9
Author: Kumud Kumar Srivatsava Tirupati <ku...@users.noreply.github.com>
AuthorDate: Tue May 31 20:27:50 2022 +0530

    [HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597)
    
    * added --sync-tool-classes config option in multitable delta streamer
    
    * added a testcase to assert if syncClientToolClassNames is getting picked to the deltastreamer execution context
---
 .../org/apache/hudi/utilities/deltastreamer/DeltaSync.java     |  2 +-
 .../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java      | 10 +++++++---
 .../utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java |  5 +++++
 .../functional/TestHoodieMultiTableDeltaStreamer.java          |  9 +++++++++
 4 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 0ae72f94b8..736e416162 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -691,7 +691,7 @@ public class DeltaSync implements Serializable {
   }
 
   private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
-    Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClass.split(",")));
+    Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
     // for backward compatibility
     if (cfg.enableHiveSync) {
       cfg.enableMetaSync = true;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 7a688b50c7..a22a3581ae 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -306,7 +306,7 @@ public class HoodieDeltaStreamer implements Serializable {
     public Boolean enableMetaSync = false;
 
     @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
-    public String syncClientToolClass = HiveSyncTool.class.getName();
+    public String syncClientToolClassNames = HiveSyncTool.class.getName();
 
     @Parameter(names = {"--max-pending-compactions"},
         description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
@@ -442,6 +442,8 @@ public class HoodieDeltaStreamer implements Serializable {
               && operation == config.operation
               && Objects.equals(filterDupes, config.filterDupes)
               && Objects.equals(enableHiveSync, config.enableHiveSync)
+              && Objects.equals(enableMetaSync, config.enableMetaSync)
+              && Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
               && Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
               && Objects.equals(maxPendingClustering, config.maxPendingClustering)
               && Objects.equals(continuousMode, config.continuousMode)
@@ -466,8 +468,8 @@ public class HoodieDeltaStreamer implements Serializable {
               baseFileFormat, propsFilePath, configs, sourceClassName,
               sourceOrderingField, payloadClassName, schemaProviderClassName,
               transformerClassNames, sourceLimit, operation, filterDupes,
-              enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode,
-              minSyncIntervalSeconds, sparkMaster, commitOnErrors,
+              enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
+              continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors,
               deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
               compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
               initialCheckpointProvider, help);
@@ -491,6 +493,8 @@ public class HoodieDeltaStreamer implements Serializable {
               + ", operation=" + operation
               + ", filterDupes=" + filterDupes
               + ", enableHiveSync=" + enableHiveSync
+              + ", enableMetaSync=" + enableMetaSync
+              + ", syncClientToolClassNames=" + syncClientToolClassNames
               + ", maxPendingCompactions=" + maxPendingCompactions
               + ", maxPendingClustering=" + maxPendingClustering
               + ", continuousMode=" + continuousMode
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 376c9cfae3..84aee29dec 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.exception.HoodieException;
@@ -203,6 +204,7 @@ public class HoodieMultiTableDeltaStreamer {
     static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
       tableConfig.enableHiveSync = globalConfig.enableHiveSync;
       tableConfig.enableMetaSync = globalConfig.enableMetaSync;
+      tableConfig.syncClientToolClassNames = globalConfig.syncClientToolClassNames;
       tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
       tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
       tableConfig.sourceClassName = globalConfig.sourceClassName;
@@ -325,6 +327,9 @@ public class HoodieMultiTableDeltaStreamer {
     @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
     public Boolean enableMetaSync = false;
 
+    @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
+    public String syncClientToolClassNames = HiveSyncTool.class.getName();
+
     @Parameter(names = {"--max-pending-compactions"},
         description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
         + "outstanding compactions is less than this number")
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index cc2c96f2c8..8f54b0d34d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -72,10 +72,19 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
       }
       config.enableHiveSync = enableHiveSync;
       config.enableMetaSync = enableMetaSync;
+      config.syncClientToolClassNames = "com.example.DummySyncTool1,com.example.DummySyncTool2";
       return config;
     }
   }
 
+  @Test
+  public void testMetaSyncConfig() throws IOException {
+    HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
+    HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
+    TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
+    assertEquals("com.example.DummySyncTool1,com.example.DummySyncTool2", executionContext.getConfig().syncClientToolClassNames);
+  }
+
   @Test
   public void testInvalidHiveSyncProps() throws IOException {
     HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);