You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by bh...@apache.org on 2020/04/10 15:59:08 UTC

[incubator-hudi] branch master updated: [HUDI - 738] Add validation to DeltaStreamer to fail fast when filterDupes is enabled on UPSERT mode. (#1505)

This is an automated email from the ASF dual-hosted git repository.

bhavanisudha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c7cef3  [HUDI - 738] Add validation to DeltaStreamer to fail fast when filterDupes is enabled on UPSERT mode. (#1505)
8c7cef3 is described below

commit 8c7cef3e50471d3645b207d97d515107764688c9
Author: Bhavani Sudha Saktheeswaran <bh...@uber.com>
AuthorDate: Fri Apr 10 08:58:55 2020 -0700

    [HUDI - 738] Add validation to DeltaStreamer to fail fast when filterDupes is enabled on UPSERT mode. (#1505)
    
    Summary:
    This fix ensures for UPSERT operation, '--filter-dupes' is disabled and fails fast if not. Otherwise it would drop all updates silently and only take in new records.
---
 .../org/apache/hudi/utilities/deltastreamer/DeltaSync.java |  5 -----
 .../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java  |  8 +++-----
 .../deltastreamer/HoodieMultiTableDeltaStreamer.java       |  3 +++
 .../org/apache/hudi/utilities/TestHoodieDeltaStreamer.java | 14 ++++++++++++--
 4 files changed, 18 insertions(+), 12 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c964c91..7ec7303 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -173,9 +173,6 @@ public class DeltaSync implements Serializable {
         UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
 
     this.hiveConf = hiveConf;
-    if (cfg.filterDupes) {
-      cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
-    }
 
     // If schemaRegistry already resolved, setup write-client
     setupWriteClient();
@@ -348,8 +345,6 @@ public class DeltaSync implements Serializable {
     Option<String> scheduledCompactionInstant = Option.empty();
     // filter dupes if needed
     if (cfg.filterDupes) {
-      // turn upserts to insert
-      cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
       records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig());
     }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 8368478..0325eaf 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -388,17 +388,15 @@ public class HoodieDeltaStreamer implements Serializable {
         tableType = HoodieTableType.valueOf(cfg.tableType);
       }
 
+      ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,
+          "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
+
       this.props = properties != null ? properties : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
       LOG.info("Creating delta streamer with configs : " + props.toString());
       this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
 
-      if (cfg.filterDupes) {
-        cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
-      }
-
       deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf,
         this::onInitializingWriteClient);
-
     }
 
     public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 74455f2..d9c8f83 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -66,6 +67,8 @@ public class HoodieMultiTableDeltaStreamer {
     this.jssc = jssc;
     String commonPropsFile = config.propsFilePath;
     String configFolder = config.configFolder;
+    ValidationUtils.checkArgument(!config.filterDupes || config.operation != HoodieDeltaStreamer.Operation.UPSERT,
+        "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
     FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
     configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
     checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 7edf534..8b661e7 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -661,7 +661,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     // Generate the same 1000 records + 1000 new ones for upsert
     cfg.filterDupes = true;
     cfg.sourceLimit = 2000;
-    cfg.operation = Operation.UPSERT;
+    cfg.operation = Operation.INSERT;
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
@@ -674,7 +674,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
     HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
     HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
-    cfg2.filterDupes = true;
+    cfg2.filterDupes = false;
     cfg2.sourceLimit = 2000;
     cfg2.operation = Operation.UPSERT;
     cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
@@ -690,6 +690,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
         .fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
     System.out.println("New Commit Metadata=" + commitMetadata);
     assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
+
+    // Try UPSERT with filterDupes true. Expect exception
+    cfg2.filterDupes = true;
+    cfg2.operation = Operation.UPSERT;
+    try {
+      new HoodieDeltaStreamer(cfg2, jsc).sync();
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
+    }
+
   }
 
   @Test