You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/05/26 12:21:33 UTC

[hudi] branch master updated: [HUDI-4124] Add valid check in Spark Datasource configs (#5637)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d2f009048 [HUDI-4124] Add valid check in Spark Datasource configs (#5637)
8d2f009048 is described below

commit 8d2f00904882b9946331bfb325c0033f72aa29fe
Author: komao <ma...@gmail.com>
AuthorDate: Thu May 26 20:21:28 2022 +0800

    [HUDI-4124] Add valid check in Spark Datasource configs (#5637)
    
    
    
    Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 11 +++++++
 .../apache/hudi/common/config/ConfigProperty.java  | 35 +++++++++++++++++-----
 .../apache/hudi/common/config/HoodieConfig.java    |  1 +
 .../scala/org/apache/hudi/DataSourceOptions.scala  | 23 +++++++++++++-
 4 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index dbd45b9738..0cef5550af 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -44,6 +44,13 @@ import static org.apache.hudi.config.HoodieHBaseIndexConfig.PUT_BATCH_SIZE;
 import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME;
 import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKPORT;
 import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM;
+import static org.apache.hudi.index.HoodieIndex.IndexType.BLOOM;
+import static org.apache.hudi.index.HoodieIndex.IndexType.BUCKET;
+import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
+import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
+import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE;
+import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
+import static org.apache.hudi.index.HoodieIndex.IndexType.SIMPLE;
 
 /**
  * Indexing related config.
@@ -57,7 +64,10 @@ public class HoodieIndexConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
       .key("hoodie.index.type")
+      // Builder#getDefaultIndexType has already set it according to engine type
       .noDefaultValue()
+      .withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(),
+          SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
       .withDocumentation("Type of index to use. Default is Bloom filter. "
           + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
           + "Bloom filters removes the dependency on a external system "
@@ -141,6 +151,7 @@ public class HoodieIndexConfig extends HoodieConfig {
   public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
       .key("hoodie.bloom.index.filter.type")
       .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
+      .withValidValues(BloomFilterTypeCode.SIMPLE.name(), BloomFilterTypeCode.DYNAMIC_V0.name())
       .withDocumentation("Filter type used. Default is BloomFilterTypeCode.DYNAMIC_V0. "
           + "Available values are [BloomFilterTypeCode.SIMPLE , BloomFilterTypeCode.DYNAMIC_V0]. "
           + "Dynamic bloom filters auto size themselves based on number of keys.");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
index 9612914303..934803d8d3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
@@ -24,7 +24,10 @@ import org.apache.hudi.exception.HoodieException;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.Objects;
 
@@ -48,19 +51,22 @@ public class ConfigProperty<T> implements Serializable {
 
   private final Option<String> deprecatedVersion;
 
+  private final Set<String> validValues;
+
   private final String[] alternatives;
 
   // provide the ability to infer config value based on other configs
   private final Option<Function<HoodieConfig, Option<T>>> inferFunction;
 
   ConfigProperty(String key, T defaultValue, String doc, Option<String> sinceVersion,
-                 Option<String> deprecatedVersion, Option<Function<HoodieConfig, Option<T>>> inferFunc, String... alternatives) {
+                 Option<String> deprecatedVersion, Option<Function<HoodieConfig, Option<T>>> inferFunc, Set<String> validValues, String... alternatives) {
     this.key = Objects.requireNonNull(key);
     this.defaultValue = defaultValue;
     this.doc = doc;
     this.sinceVersion = sinceVersion;
     this.deprecatedVersion = deprecatedVersion;
     this.inferFunction = inferFunc;
+    this.validValues = validValues;
     this.alternatives = alternatives;
   }
 
@@ -95,33 +101,46 @@ public class ConfigProperty<T> implements Serializable {
     return inferFunction;
   }
 
+  public void checkValues(String value) {
+    if (validValues != null && !validValues.isEmpty() && !validValues.contains(value)) {
+      throw new IllegalArgumentException(
+          "The value of " + key + " should be one of "
+              + String.join(",", validValues) + ", but was " + value);
+    }
+  }
+
   public List<String> getAlternatives() {
     return Arrays.asList(alternatives);
   }
 
   public ConfigProperty<T> withDocumentation(String doc) {
     Objects.requireNonNull(doc);
-    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives);
+    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives);
+  }
+
+  public ConfigProperty<T> withValidValues(String... validValues) {
+    Objects.requireNonNull(validValues);
+    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, new HashSet<>(Arrays.asList(validValues)), alternatives);
   }
 
   public ConfigProperty<T> withAlternatives(String... alternatives) {
     Objects.requireNonNull(alternatives);
-    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives);
+    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives);
   }
 
   public ConfigProperty<T> sinceVersion(String sinceVersion) {
     Objects.requireNonNull(sinceVersion);
-    return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, alternatives);
+    return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, validValues, alternatives);
   }
 
   public ConfigProperty<T> deprecatedAfter(String deprecatedVersion) {
     Objects.requireNonNull(deprecatedVersion);
-    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, alternatives);
+    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, validValues, alternatives);
   }
 
   public ConfigProperty<T> withInferFunction(Function<HoodieConfig, Option<T>> inferFunction) {
     Objects.requireNonNull(inferFunction);
-    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), alternatives);
+    return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), validValues, alternatives);
   }
 
   /**
@@ -156,13 +175,13 @@ public class ConfigProperty<T> implements Serializable {
 
     public <T> ConfigProperty<T> defaultValue(T value) {
       Objects.requireNonNull(value);
-      ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty());
+      ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty(), Collections.emptySet());
       return configProperty;
     }
 
     public ConfigProperty<String> noDefaultValue() {
       ConfigProperty<String> configProperty = new ConfigProperty<>(key, null, "", Option.empty(),
-          Option.empty(), Option.empty());
+          Option.empty(), Option.empty(), Collections.emptySet());
       return configProperty;
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index c77e292b47..1aa0cfba5b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -57,6 +57,7 @@ public class HoodieConfig implements Serializable {
   }
 
   public <T> void setValue(ConfigProperty<T> cfg, String val) {
+    cfg.checkValues(val);
     props.setProperty(cfg.key(), val);
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 36dd07f28a..ac4d0e5794 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -56,6 +56,7 @@ object DataSourceReadOptions {
     .key("hoodie.datasource.query.type")
     .defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL)
     .withAlternatives("hoodie.datasource.view.type")
+    .withValidValues(QUERY_TYPE_SNAPSHOT_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_INCREMENTAL_OPT_VAL)
     .withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " +
       "(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
       "(obtain latest view, by merging base and (if any) log files)")
@@ -65,6 +66,7 @@ object DataSourceReadOptions {
   val REALTIME_MERGE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.merge.type")
     .defaultValue(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
+    .withValidValues(REALTIME_SKIP_MERGE_OPT_VAL, REALTIME_PAYLOAD_COMBINE_OPT_VAL)
     .withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " +
       s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
       s"${REALTIME_SKIP_MERGE_OPT_VAL}")
@@ -210,6 +212,23 @@ object DataSourceWriteOptions {
   val OPERATION: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.operation")
     .defaultValue(UPSERT_OPERATION_OPT_VAL)
+    .withValidValues(
+      WriteOperationType.INSERT.value,
+      WriteOperationType.INSERT_PREPPED.value,
+      WriteOperationType.UPSERT.value,
+      WriteOperationType.UPSERT_PREPPED.value,
+      WriteOperationType.BULK_INSERT.value,
+      WriteOperationType.BULK_INSERT_PREPPED.value,
+      WriteOperationType.DELETE.value,
+      WriteOperationType.BOOTSTRAP.value,
+      WriteOperationType.INSERT_OVERWRITE.value,
+      WriteOperationType.CLUSTER.value,
+      WriteOperationType.DELETE_PARTITION.value,
+      WriteOperationType.INSERT_OVERWRITE_TABLE.value,
+      WriteOperationType.COMPACT.value,
+      WriteOperationType.INSERT.value,
+      WriteOperationType.ALTER_SCHEMA.value
+    )
     .withDocumentation("Whether to do upsert, insert or bulkinsert for the write operation. " +
       "Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
       "bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
@@ -220,6 +239,7 @@ object DataSourceWriteOptions {
   val TABLE_TYPE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.table.type")
     .defaultValue(COW_TABLE_TYPE_OPT_VAL)
+    .withValidValues(COW_TABLE_TYPE_OPT_VAL, MOR_TABLE_TYPE_OPT_VAL)
     .withAlternatives("hoodie.datasource.write.storage.type")
     .withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")
 
@@ -308,7 +328,8 @@ object DataSourceWriteOptions {
       Option.of(classOf[NonpartitionedKeyGenerator].getName)
     } else {
       val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
-      if (numOfPartFields == 1) {
+      val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length
+      if (numOfPartFields == 1 && numOfRecordKeyFields == 1) {
         Option.of(classOf[SimpleKeyGenerator].getName)
       } else {
         Option.of(classOf[ComplexKeyGenerator].getName)