You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/08/16 16:53:50 UTC
[hudi] branch master updated: [HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8c02e90a9b [HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027)
8c02e90a9b is described below
commit 8c02e90a9bc36bdd1f9aa115b792c0dc57ae7868
Author: Qi Ji <qj...@users.noreply.github.com>
AuthorDate: Wed Aug 17 00:53:46 2022 +0800
[HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027)
---
.../hudi/utilities/deltastreamer/DeltaSync.java | 2 +-
.../deltastreamer/HoodieDeltaStreamer.java | 7 ++++++-
.../functional/TestHoodieDeltaStreamer.java | 21 +++++++++++++++++++++
3 files changed, 28 insertions(+), 2 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index f3d9af3150..3c1d1f5ef1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -630,7 +630,7 @@ public class DeltaSync implements Serializable {
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty());
}
- if (!isEmpty) {
+ if (!isEmpty || cfg.forceEmptyMetaSync) {
runMetaSync();
}
} else {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a22a3581ae..fe7576cc80 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -305,6 +305,9 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
public Boolean enableMetaSync = false;
+ @Parameter(names = {"--force-empty-sync"}, description = "Force syncing meta even on empty commit")
+ public Boolean forceEmptyMetaSync = false;
+
@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
public String syncClientToolClassNames = HiveSyncTool.class.getName();
@@ -443,6 +446,7 @@ public class HoodieDeltaStreamer implements Serializable {
&& Objects.equals(filterDupes, config.filterDupes)
&& Objects.equals(enableHiveSync, config.enableHiveSync)
&& Objects.equals(enableMetaSync, config.enableMetaSync)
+ && Objects.equals(forceEmptyMetaSync, config.forceEmptyMetaSync)
&& Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
&& Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
&& Objects.equals(maxPendingClustering, config.maxPendingClustering)
@@ -468,7 +472,7 @@ public class HoodieDeltaStreamer implements Serializable {
baseFileFormat, propsFilePath, configs, sourceClassName,
sourceOrderingField, payloadClassName, schemaProviderClassName,
transformerClassNames, sourceLimit, operation, filterDupes,
- enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
+ enableHiveSync, enableMetaSync, forceEmptyMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors,
deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
@@ -494,6 +498,7 @@ public class HoodieDeltaStreamer implements Serializable {
+ ", filterDupes=" + filterDupes
+ ", enableHiveSync=" + enableHiveSync
+ ", enableMetaSync=" + enableMetaSync
+ + ", forceEmptyMetaSync=" + forceEmptyMetaSync
+ ", syncClientToolClassNames=" + syncClientToolClassNames
+ ", maxPendingCompactions=" + maxPendingCompactions
+ ", maxPendingClustering=" + maxPendingClustering
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 850b0d1d60..88948b0385 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -2169,6 +2169,27 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
assertFalse(tableFields.contains("partition_path"));
}
+ @Test
+ public void testForceEmptyMetaSync() throws Exception {
+ String tableBasePath = dfsBasePath + "/test_force_empty_meta_sync";
+
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ cfg.sourceLimit = 0;
+ cfg.allowCommitOnNoCheckpointChange = true;
+ cfg.enableMetaSync = true;
+ cfg.forceEmptyMetaSync = true;
+
+ new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+ TestHelpers.assertRecordCount(0, tableBasePath, sqlContext);
+
+ // make sure hive table is present
+ HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
+ hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
+ HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
+ final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
+ assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
+ }
+
class TestDeltaSync extends DeltaSync {
public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,