You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/05/01 20:21:26 UTC

[incubator-hudi] branch master updated: Making DataSource/DeltaStreamer use defaults for combining

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 57a8b9c  Making DataSource/DeltaStreamer use defaults for combining
57a8b9c is described below

commit 57a8b9cc8c6a1526bd9e2174321a5b675d716943
Author: vinothchandar <vi...@apache.org>
AuthorDate: Wed May 1 05:06:34 2019 -0700

    Making DataSource/DeltaStreamer use defaults for combining
    
     - Addresses issue where insert will combine and remove duplicates within batch
     - Setting default insert combining to false (write client default)
     - Set to true if filtering duplicates on insert/bulk_insert
---
 .../src/main/java/com/uber/hoodie/DataSourceUtils.java       | 12 ++++++++----
 .../main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala    |  7 ++-----
 .../hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java  |  4 ++--
 3 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index 28811bb..3c56a0d 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -128,12 +128,16 @@ public class DataSourceUtils {
       String basePath, String tblName, Map<String, String> parameters) throws Exception {
 
     // inline compaction is on by default for MOR
-    boolean inlineCompact = parameters.containsKey(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
-        && parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()).equals(DataSourceWriteOptions
-        .MOR_STORAGE_TYPE_OPT_VAL());
+    boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
+        .equals(DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL());
 
-    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true)
+    // insert/bulk-insert combining to be true, if filtering for duplicates
+    boolean combineInserts = Boolean.parseBoolean(parameters.get(
+        DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY()));
+
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
         .withPath(basePath).withAutoCommit(false)
+        .combineInput(combineInserts, true)
         .withSchema(schemaStr).forTable(tblName).withIndexConfig(
             HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
index e7ae8e0..a3e50df 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
@@ -138,10 +138,7 @@ private[hoodie] object HoodieSparkSqlWriter {
     }
 
     // Create a HoodieWriteClient & issue the write.
-    val client = DataSourceUtils.createHoodieClient(jsc,
-      schema.toString,
-      path.get,
-      tblName.get,
+    val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
       mapAsJavaMap(parameters)
     )
     val commitTime = client.startCommit()
@@ -257,4 +254,4 @@ private[hoodie] object HoodieSparkSqlWriter {
     hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
     hiveSyncConfig
   }
-}
\ No newline at end of file
+}
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index 0ad10de..68621c4 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -326,8 +326,8 @@ public class HoodieDeltaStreamer implements Serializable {
 
   private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
     HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
-            .withAutoCommit(false)
+        HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
+            .withAutoCommit(false).combineInput(cfg.filterDupes, true)
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 .withPayloadClass(cfg.payloadClassName)
                 // turn on inline compaction by default, for MOR tables