You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/05/01 20:21:26 UTC
[incubator-hudi] branch master updated: Making
DataSource/DeltaStreamer use defaults for combining
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 57a8b9c Making DataSource/DeltaStreamer use defaults for combining
57a8b9c is described below
commit 57a8b9cc8c6a1526bd9e2174321a5b675d716943
Author: vinothchandar <vi...@apache.org>
AuthorDate: Wed May 1 05:06:34 2019 -0700
Making DataSource/DeltaStreamer use defaults for combining
- Addresses issue where insert will combine and remove duplicates within batch
- Setting default insert combining to false (write client default)
- Set to true if filtering duplicates on insert/bulk_insert
---
.../src/main/java/com/uber/hoodie/DataSourceUtils.java | 12 ++++++++----
.../main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala | 7 ++-----
.../hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java | 4 ++--
3 files changed, 12 insertions(+), 11 deletions(-)
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index 28811bb..3c56a0d 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -128,12 +128,16 @@ public class DataSourceUtils {
String basePath, String tblName, Map<String, String> parameters) throws Exception {
// inline compaction is on by default for MOR
- boolean inlineCompact = parameters.containsKey(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
- && parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()).equals(DataSourceWriteOptions
- .MOR_STORAGE_TYPE_OPT_VAL());
+ boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
+ .equals(DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL());
- HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true)
+ // insert/bulk-insert combining to be true, if filtering for duplicates
+ boolean combineInserts = Boolean.parseBoolean(parameters.get(
+ DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY()));
+
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withAutoCommit(false)
+ .combineInput(combineInserts, true)
.withSchema(schemaStr).forTable(tblName).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
index e7ae8e0..a3e50df 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
@@ -138,10 +138,7 @@ private[hoodie] object HoodieSparkSqlWriter {
}
// Create a HoodieWriteClient & issue the write.
- val client = DataSourceUtils.createHoodieClient(jsc,
- schema.toString,
- path.get,
- tblName.get,
+ val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
mapAsJavaMap(parameters)
)
val commitTime = client.startCommit()
@@ -257,4 +254,4 @@ private[hoodie] object HoodieSparkSqlWriter {
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig
}
-}
\ No newline at end of file
+}
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index 0ad10de..68621c4 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -326,8 +326,8 @@ public class HoodieDeltaStreamer implements Serializable {
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
HoodieWriteConfig.Builder builder =
- HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
- .withAutoCommit(false)
+ HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
+ .withAutoCommit(false).combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
// turn on inline compaction by default, for MOR tables