You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/08/16 16:53:50 UTC

[hudi] branch master updated: [HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c02e90a9b [HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027)
8c02e90a9b is described below

commit 8c02e90a9bc36bdd1f9aa115b792c0dc57ae7868
Author: Qi Ji <qj...@users.noreply.github.com>
AuthorDate: Wed Aug 17 00:53:46 2022 +0800

    [HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027)
---
 .../hudi/utilities/deltastreamer/DeltaSync.java     |  2 +-
 .../deltastreamer/HoodieDeltaStreamer.java          |  7 ++++++-
 .../functional/TestHoodieDeltaStreamer.java         | 21 +++++++++++++++++++++
 3 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index f3d9af3150..3c1d1f5ef1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -630,7 +630,7 @@ public class DeltaSync implements Serializable {
           scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty());
         }
 
-        if (!isEmpty) {
+        if (!isEmpty || cfg.forceEmptyMetaSync) {
           runMetaSync();
         }
       } else {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a22a3581ae..fe7576cc80 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -305,6 +305,9 @@ public class HoodieDeltaStreamer implements Serializable {
     @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
     public Boolean enableMetaSync = false;
 
+    @Parameter(names = {"--force-empty-sync"}, description = "Force syncing meta even on empty commit")
+    public Boolean forceEmptyMetaSync = false;
+
     @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
     public String syncClientToolClassNames = HiveSyncTool.class.getName();
 
@@ -443,6 +446,7 @@ public class HoodieDeltaStreamer implements Serializable {
               && Objects.equals(filterDupes, config.filterDupes)
               && Objects.equals(enableHiveSync, config.enableHiveSync)
               && Objects.equals(enableMetaSync, config.enableMetaSync)
+              && Objects.equals(forceEmptyMetaSync, config.forceEmptyMetaSync)
               && Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
               && Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
               && Objects.equals(maxPendingClustering, config.maxPendingClustering)
@@ -468,7 +472,7 @@ public class HoodieDeltaStreamer implements Serializable {
               baseFileFormat, propsFilePath, configs, sourceClassName,
               sourceOrderingField, payloadClassName, schemaProviderClassName,
               transformerClassNames, sourceLimit, operation, filterDupes,
-              enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
+              enableHiveSync, enableMetaSync, forceEmptyMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
               continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors,
               deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
               compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
@@ -494,6 +498,7 @@ public class HoodieDeltaStreamer implements Serializable {
               + ", filterDupes=" + filterDupes
               + ", enableHiveSync=" + enableHiveSync
               + ", enableMetaSync=" + enableMetaSync
+              + ", forceEmptyMetaSync=" + forceEmptyMetaSync
               + ", syncClientToolClassNames=" + syncClientToolClassNames
               + ", maxPendingCompactions=" + maxPendingCompactions
               + ", maxPendingClustering=" + maxPendingClustering
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 850b0d1d60..88948b0385 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -2169,6 +2169,27 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     assertFalse(tableFields.contains("partition_path"));
   }
 
+  @Test
+  public void testForceEmptyMetaSync() throws Exception {
+    String tableBasePath = dfsBasePath + "/test_force_empty_meta_sync";
+
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    cfg.sourceLimit = 0;
+    cfg.allowCommitOnNoCheckpointChange = true;
+    cfg.enableMetaSync = true;
+    cfg.forceEmptyMetaSync = true;
+
+    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    TestHelpers.assertRecordCount(0, tableBasePath, sqlContext);
+
+    // make sure hive table is present
+    HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
+    hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
+    HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
+    final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
+    assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
+  }
+
   class TestDeltaSync extends DeltaSync {
 
     public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,