You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/06/19 19:12:46 UTC

[hudi] branch master updated: [HUDI-1023] Add validation error messages in delta sync (#1710)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a9fdd6  [HUDI-1023] Add validation error messages in delta sync (#1710)
8a9fdd6 is described below

commit 8a9fdd603e3e532ea5252b98205acfb8aa648795
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Fri Jun 19 12:12:35 2020 -0700

    [HUDI-1023] Add validation error messages in delta sync (#1710)
    
    - Remove explicitly specifying BLOOM_INDEX since thats the default anyway
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 88 +++++++++++-----------
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 25 +++---
 2 files changed, 60 insertions(+), 53 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d899257..083d780 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -54,55 +54,55 @@ import java.util.stream.Collectors;
 public class HoodieWriteConfig extends DefaultHoodieConfig {
 
   public static final String TABLE_NAME = "hoodie.table.name";
-  private static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
-  private static final String BASE_PATH_PROP = "hoodie.base.path";
-  private static final String AVRO_SCHEMA = "hoodie.avro.schema";
-  private static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate";
-  private static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
-  private static final String DEFAULT_PARALLELISM = "1500";
-  private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
-  private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
-  private static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
-  private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
-  private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
-  private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
-  private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
-  private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
-  private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
-  private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
-  private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
-  private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
-  private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
-  private static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete";
-  private static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
+  public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
+  public static final String BASE_PATH_PROP = "hoodie.base.path";
+  public static final String AVRO_SCHEMA = "hoodie.avro.schema";
+  public static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate";
+  public static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
+  public static final String DEFAULT_PARALLELISM = "1500";
+  public static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
+  public static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
+  public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
+  public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
+  public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
+  public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
+  public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
+  public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
+  public static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
+  public static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
+  public static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
+  public static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
+  public static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
+  public static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete";
+  public static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
   public static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level";
-  private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
-  private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
-  private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
-  private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
-  private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
-  private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
-  private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
-  private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
-  private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
-
-  private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
-  private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
-
-  private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
-  private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
+  public static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
+  public static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
+  public static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
+  public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
+  public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
+  public static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
+  public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
+  public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
+  public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
+
+  public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
+  public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
+
+  public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
+  public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
   // time between successive attempts to ensure written data's metadata is consistent on storage
-  private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
+  public static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
       "hoodie.consistency.check.initial_interval_ms";
-  private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
+  public static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
 
   // max interval time
-  private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
-  private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
+  public static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
+  public static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
 
   // maximum number of checks, for consistency of written data. Will wait upto 256 Secs
-  private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
-  private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+  public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
+  public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
 
   /**
    * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
@@ -114,9 +114,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
    * Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
    * (disabled by default) which will allow this old behavior.
    */
-  private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT =
+  public static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT =
       "_.hoodie.allow.multi.write.on.same.instant";
-  private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
+  public static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
 
   private ConsistencyGuardConfig consistencyGuardConfig;
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 1a3e43c..068e592 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -33,12 +33,10 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
-import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
@@ -74,6 +72,10 @@ import java.util.stream.Collectors;
 
 import scala.collection.JavaConversions;
 
+import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
+import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP;
+import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP;
+import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
 
@@ -500,14 +502,15 @@ public class DeltaSync implements Serializable {
    * @param schemaProvider Schema Provider
    */
   private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
+    final boolean combineBeforeUpsert = true;
+    final boolean autoCommit = false;
     HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
+        HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, combineBeforeUpsert)
             .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
                 // Inline compaction is disabled for continuous mode. otherwise enabled for MOR
                 .withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
             .forTable(cfg.targetTableName)
-            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-            .withAutoCommit(false).withProps(props);
+            .withAutoCommit(autoCommit).withProps(props);
 
     if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
       builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
@@ -515,10 +518,14 @@ public class DeltaSync implements Serializable {
     HoodieWriteConfig config = builder.build();
 
     // Validate what deltastreamer assumes of write-config to be really safe
-    ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled());
-    ValidationUtils.checkArgument(!config.shouldAutoCommit());
-    ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes);
-    ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert());
+    ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled(),
+        String.format("%s should be set to %s", INLINE_COMPACT_PROP, cfg.isInlineCompactionEnabled()));
+    ValidationUtils.checkArgument(!config.shouldAutoCommit(),
+        String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP, autoCommit));
+    ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes,
+        String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP, cfg.filterDupes));
+    ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
+        String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP, combineBeforeUpsert));
 
     return config;
   }