You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by pr...@apache.org on 2022/05/31 14:57:58 UTC
[hudi] branch master updated: [HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597)
This is an automated email from the ASF dual-hosted git repository.
pratyakshsharma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 795a99ba73 [HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597)
795a99ba73 is described below
commit 795a99ba73183b5f7697bb3964843a43a64e74e9
Author: Kumud Kumar Srivatsava Tirupati <ku...@users.noreply.github.com>
AuthorDate: Tue May 31 20:27:50 2022 +0530
[HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597)
* added --sync-tool-classes config option in multitable delta streamer
* added a testcase to assert if syncClientToolClassNames is getting picked to the deltastreamer execution context
---
.../org/apache/hudi/utilities/deltastreamer/DeltaSync.java | 2 +-
.../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | 10 +++++++---
.../utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java | 5 +++++
.../functional/TestHoodieMultiTableDeltaStreamer.java | 9 +++++++++
4 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 0ae72f94b8..736e416162 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -691,7 +691,7 @@ public class DeltaSync implements Serializable {
}
private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
- Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClass.split(",")));
+ Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
// for backward compatibility
if (cfg.enableHiveSync) {
cfg.enableMetaSync = true;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 7a688b50c7..a22a3581ae 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -306,7 +306,7 @@ public class HoodieDeltaStreamer implements Serializable {
public Boolean enableMetaSync = false;
@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
- public String syncClientToolClass = HiveSyncTool.class.getName();
+ public String syncClientToolClassNames = HiveSyncTool.class.getName();
@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
@@ -442,6 +442,8 @@ public class HoodieDeltaStreamer implements Serializable {
&& operation == config.operation
&& Objects.equals(filterDupes, config.filterDupes)
&& Objects.equals(enableHiveSync, config.enableHiveSync)
+ && Objects.equals(enableMetaSync, config.enableMetaSync)
+ && Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
&& Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
&& Objects.equals(maxPendingClustering, config.maxPendingClustering)
&& Objects.equals(continuousMode, config.continuousMode)
@@ -466,8 +468,8 @@ public class HoodieDeltaStreamer implements Serializable {
baseFileFormat, propsFilePath, configs, sourceClassName,
sourceOrderingField, payloadClassName, schemaProviderClassName,
transformerClassNames, sourceLimit, operation, filterDupes,
- enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode,
- minSyncIntervalSeconds, sparkMaster, commitOnErrors,
+ enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
+ continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors,
deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
initialCheckpointProvider, help);
@@ -491,6 +493,8 @@ public class HoodieDeltaStreamer implements Serializable {
+ ", operation=" + operation
+ ", filterDupes=" + filterDupes
+ ", enableHiveSync=" + enableHiveSync
+ + ", enableMetaSync=" + enableMetaSync
+ + ", syncClientToolClassNames=" + syncClientToolClassNames
+ ", maxPendingCompactions=" + maxPendingCompactions
+ ", maxPendingClustering=" + maxPendingClustering
+ ", continuousMode=" + continuousMode
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 376c9cfae3..84aee29dec 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.exception.HoodieException;
@@ -203,6 +204,7 @@ public class HoodieMultiTableDeltaStreamer {
static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
tableConfig.enableMetaSync = globalConfig.enableMetaSync;
+ tableConfig.syncClientToolClassNames = globalConfig.syncClientToolClassNames;
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
tableConfig.sourceClassName = globalConfig.sourceClassName;
@@ -325,6 +327,9 @@ public class HoodieMultiTableDeltaStreamer {
@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
public Boolean enableMetaSync = false;
+ @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
+ public String syncClientToolClassNames = HiveSyncTool.class.getName();
+
@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
+ "outstanding compactions is less than this number")
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index cc2c96f2c8..8f54b0d34d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -72,10 +72,19 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
}
config.enableHiveSync = enableHiveSync;
config.enableMetaSync = enableMetaSync;
+ config.syncClientToolClassNames = "com.example.DummySyncTool1,com.example.DummySyncTool2";
return config;
}
}
+ @Test
+ public void testMetaSyncConfig() throws IOException {
+ HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
+ HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
+ TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
+ assertEquals("com.example.DummySyncTool1,com.example.DummySyncTool2", executionContext.getConfig().syncClientToolClassNames);
+ }
+
@Test
public void testInvalidHiveSyncProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);