You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/11/03 02:04:38 UTC
[hudi] branch master updated: [HUDI-2538] persist some configs to
hoodie.properties when the first write (#3823)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6351e5f [HUDI-2538] persist some configs to hoodie.properties when the first write (#3823)
6351e5f is described below
commit 6351e5f4d042b14cd0f6715c36f23d75fcc8e091
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Wed Nov 3 10:04:23 2021 +0800
[HUDI-2538] persist some configs to hoodie.properties when the first write (#3823)
---
.../table/upgrade/TwoToThreeUpgradeHandler.java | 9 +-
.../factory/HoodieSparkKeyGeneratorFactory.java | 98 +++++++++------
.../client/functional/TestHoodieMetadataBase.java | 8 +-
.../apache/hudi/keygen/TestCustomKeyGenerator.java | 13 +-
.../hudi/testutils/HoodieClientTestHarness.java | 13 +-
.../apache/hudi/common/config/HoodieConfig.java | 6 +-
.../hudi/common/table/HoodieTableConfig.java | 16 +++
.../hudi/common/table/HoodieTableMetaClient.java | 24 ++++
.../common/testutils/HoodieCommonTestHarness.java | 5 +
.../main/scala/org/apache/hudi/DefaultSource.scala | 12 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 118 +++++++++++++-----
.../org/apache/hudi/HoodieStreamingSink.scala | 9 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 5 +-
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 9 +-
.../hudi/command/CreateHoodieTableCommand.scala | 83 ++++++++-----
.../hudi/command/DeleteHoodieTableCommand.scala | 13 +-
.../command/InsertIntoHoodieTableCommand.scala | 52 +++++---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 60 ++++-----
.../spark/sql/hudi/command/SqlKeyGenerator.scala | 18 ++-
.../hudi/command/UpdateHoodieTableCommand.scala | 14 ++-
.../apache/hudi/HoodieSparkSqlWriterSuite.scala | 137 ++++++++++++++++-----
.../org/apache/hudi/TestHoodieFileIndex.scala | 9 ++
.../functional/TestDataSourceForBootstrap.scala | 8 +-
.../apache/hudi/functional/TestMORDataSource.scala | 3 +
.../hudi/functional/TestTimeTravelQuery.scala | 1 +
25 files changed, 540 insertions(+), 203 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
index 6a825e1..e1dbfbb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
@@ -21,10 +21,11 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -39,6 +40,10 @@ public class TwoToThreeUpgradeHandler implements UpgradeHandler {
// table has been updated and is not backward compatible.
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
}
- return Collections.emptyMap();
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ tablePropsToAdd.put(HoodieTableConfig.URL_ENCODE_PARTITIONING, config.getStringOrDefault(HoodieTableConfig.URL_ENCODE_PARTITIONING));
+ tablePropsToAdd.put(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, config.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
+ tablePropsToAdd.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, config.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME));
+ return tablePropsToAdd;
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index d4e99f7..165b27d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -19,14 +19,13 @@
package org.apache.hudi.keygen.factory;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
-import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
-import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -37,8 +36,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Locale;
-import java.util.Objects;
+import java.util.Map;
/**
* Factory help to create {@link org.apache.hudi.keygen.KeyGenerator}.
@@ -50,45 +50,73 @@ public class HoodieSparkKeyGeneratorFactory {
private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkKeyGeneratorFactory.class);
- public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
- // keyGenerator class name has higher priority
- KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props);
+ private static final Map<String, String> COMMON_TO_SPARK_KEYGENERATOR = new HashMap<>();
+ static {
+ COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.ComplexAvroKeyGenerator",
+ "org.apache.hudi.keygen.ComplexKeyGenerator");
+ COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.CustomAvroKeyGenerator",
+ "org.apache.hudi.keygen.CustomKeyGenerator");
+ COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator",
+ "org.apache.hudi.keygen.GlobalDeleteKeyGenerator");
+ COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator",
+ "org.apache.hudi.keygen.NonpartitionedKeyGenerator");
+ COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.SimpleAvroKeyGenerator",
+ "org.apache.hudi.keygen.SimpleKeyGenerator");
+ COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator",
+ "org.apache.hudi.keygen.TimestampBasedKeyGenerator");
+ }
- return Objects.isNull(keyGenerator) ? createKeyGeneratorByType(props) : keyGenerator;
+ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
+ String keyGeneratorClass = getKeyGeneratorClassName(props);
+ try {
+ return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
+ } catch (Throwable e) {
+ throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
+ }
}
- private static BuiltinKeyGenerator createKeyGeneratorByType(TypedProperties props) throws IOException {
- // Use KeyGeneratorType.SIMPLE as default keyGeneratorType
- String keyGeneratorType =
- props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null);
+ public static String getKeyGeneratorClassName(TypedProperties props) {
+ String keyGeneratorClass = props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), null);
- if (StringUtils.isNullOrEmpty(keyGeneratorType)) {
+ if (StringUtils.isNullOrEmpty(keyGeneratorClass)) {
+ String keyGeneratorType = props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.SIMPLE.name());
LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE.key());
- keyGeneratorType = KeyGeneratorType.SIMPLE.name();
- }
-
- KeyGeneratorType keyGeneratorTypeEnum;
- try {
- keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT));
- } catch (IllegalArgumentException e) {
- throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType);
- }
- switch (keyGeneratorTypeEnum) {
- case SIMPLE:
- return new SimpleKeyGenerator(props);
- case COMPLEX:
- return new ComplexKeyGenerator(props);
- case TIMESTAMP:
- return new TimestampBasedKeyGenerator(props);
- case CUSTOM:
- return new CustomKeyGenerator(props);
- case NON_PARTITION:
- return new NonpartitionedKeyGenerator(props);
- case GLOBAL_DELETE:
- return new GlobalDeleteKeyGenerator(props);
- default:
+ KeyGeneratorType keyGeneratorTypeEnum;
+ try {
+ keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType);
+ }
+ switch (keyGeneratorTypeEnum) {
+ case SIMPLE:
+ keyGeneratorClass = SimpleKeyGenerator.class.getName();
+ break;
+ case COMPLEX:
+ keyGeneratorClass = ComplexKeyGenerator.class.getName();
+ break;
+ case TIMESTAMP:
+ keyGeneratorClass = TimestampBasedKeyGenerator.class.getName();
+ break;
+ case CUSTOM:
+ keyGeneratorClass = CustomKeyGenerator.class.getName();
+ break;
+ case NON_PARTITION:
+ keyGeneratorClass = NonpartitionedKeyGenerator.class.getName();
+ break;
+ case GLOBAL_DELETE:
+ keyGeneratorClass = GlobalDeleteKeyGenerator.class.getName();
+ break;
+ default:
+ throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType);
+ }
}
+ return keyGeneratorClass;
}
+ /**
+ * Convert hoodie-common KeyGenerator to SparkKeyGeneratorInterface implement.
+ */
+ public static String convertToSparkKeyGenerator(String keyGeneratorClassName) {
+ return COMMON_TO_SPARK_KEYGENERATOR.getOrDefault(keyGeneratorClassName, keyGeneratorClassName);
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 7a49daf..cf261cc 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
@@ -33,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -50,6 +52,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Properties;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
@@ -268,6 +271,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics, boolean enableFullScan) {
+ Properties properties = new Properties();
+ properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
.withAutoCommit(autoCommit)
@@ -287,7 +292,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())
.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
- .usePrefix("unit-test").build());
+ .usePrefix("unit-test").build())
+ .withProperties(properties);
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 4bfc71f..4b590d9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -33,6 +33,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@@ -122,6 +124,13 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
return properties;
}
+ private String stackTraceToString(Throwable e) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ return sw.toString();
+ }
+
@Test
public void testSimpleKeyGeneratorWithKeyGeneratorClass() throws IOException {
testSimpleKeyGenerator(getPropertiesForSimpleKeyGen(true));
@@ -259,7 +268,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
.getMessage()
.contains("Property hoodie.datasource.write.recordkey.field not found"));
} else {
- Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found"));
+ Assertions.assertTrue(stackTraceToString(e).contains("Property hoodie.datasource.write.recordkey.field not found"));
}
}
@@ -282,7 +291,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
.getMessage()
.contains("Property hoodie.datasource.write.recordkey.field not found"));
} else {
- Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found"));
+ Assertions.assertTrue(stackTraceToString(e).contains("Property hoodie.datasource.write.recordkey.field not found"));
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 8c0a3bd..9ed98b1 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -249,7 +249,15 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
initMetaClient(getTableType());
}
+ protected void initMetaClient(Properties properties) throws IOException {
+ initMetaClient(getTableType(), properties);
+ }
+
protected void initMetaClient(HoodieTableType tableType) throws IOException {
+ initMetaClient(tableType, new Properties());
+ }
+
+ protected void initMetaClient(HoodieTableType tableType, Properties properties) throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
@@ -258,7 +266,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
throw new IllegalStateException("The Spark context has not been initialized.");
}
- metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
+ if (tableName != null && !tableName.isEmpty()) {
+ properties.put(HoodieTableConfig.NAME.key(), tableName);
+ }
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType, properties);
}
protected Properties getPropertiesForKeyGen() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index 1f646aa..ed2b90e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -74,6 +74,10 @@ public class HoodieConfig implements Serializable {
}
}
+ public Boolean contains(String key) {
+ return props.containsKey(key);
+ }
+
public <T> boolean contains(ConfigProperty<T> configProperty) {
if (props.containsKey(configProperty.key())) {
return true;
@@ -135,7 +139,7 @@ public class HoodieConfig implements Serializable {
public <T> boolean getBooleanOrDefault(ConfigProperty<T> configProperty) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(v -> Boolean.parseBoolean(v.toString()))
- .orElse((Boolean) configProperty.defaultValue());
+ .orElse(Boolean.parseBoolean(configProperty.defaultValue().toString()));
}
public <T> Long getLong(ConfigProperty<T> configProperty) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 129bcce..dc57fd1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -161,6 +162,9 @@ public class HoodieTableConfig extends HoodieConfig {
.noDefaultValue()
.withDocumentation("Key Generator class property for the hoodie table");
+ public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
+ public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
+
public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName();
public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
@@ -363,6 +367,18 @@ public class HoodieTableConfig extends HoodieConfig {
return getString(RECORDKEY_FIELDS);
}
+ public String getKeyGeneratorClassName() {
+ return getString(KEY_GENERATOR_CLASS_NAME);
+ }
+
+ public String getHiveStylePartitioningEnable() {
+ return getString(HIVE_STYLE_PARTITIONING_ENABLE);
+ }
+
+ public String getUrlEncodePartitoning() {
+ return getString(URL_ENCODE_PARTITIONING);
+ }
+
public Map<String, String> propsMap() {
return props.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 340a99e..450a3cc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -637,6 +637,8 @@ public class HoodieTableMetaClient implements Serializable {
private Boolean bootstrapIndexEnable;
private Boolean populateMetaFields;
private String keyGeneratorClassProp;
+ private Boolean hiveStylePartitioningEnable;
+ private Boolean urlEncodePartitioning;
private PropertyBuilder() {
@@ -725,6 +727,16 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
+ public PropertyBuilder setHiveStylePartitioningEnable(Boolean hiveStylePartitioningEnable) {
+ this.hiveStylePartitioningEnable = hiveStylePartitioningEnable;
+ return this;
+ }
+
+ public PropertyBuilder setUrlEncodePartitioning(Boolean urlEncodePartitioning) {
+ this.urlEncodePartitioning = urlEncodePartitioning;
+ return this;
+ }
+
public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
return setTableType(metaClient.getTableType())
.setTableName(metaClient.getTableConfig().getTableName())
@@ -786,6 +798,12 @@ public class HoodieTableMetaClient implements Serializable {
if (hoodieConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) {
setKeyGeneratorClassProp(hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME));
}
+ if (hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) {
+ setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
+ }
+ if (hoodieConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING)) {
+ setUrlEncodePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING));
+ }
return this;
}
@@ -849,6 +867,12 @@ public class HoodieTableMetaClient implements Serializable {
if (null != keyGeneratorClassProp) {
tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, keyGeneratorClassProp);
}
+ if (null != hiveStylePartitioningEnable) {
+ tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, Boolean.toString(hiveStylePartitioningEnable));
+ }
+ if (null != urlEncodePartitioning) {
+ tableConfig.setValue(HoodieTableConfig.URL_ENCODE_PARTITIONING, Boolean.toString(urlEncodePartitioning));
+ }
return tableConfig.getProps();
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 9738816..311c131 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -34,12 +34,17 @@ import java.io.IOException;
*/
public class HoodieCommonTestHarness {
+ protected String tableName = null;
protected String basePath = null;
protected transient HoodieTestDataGenerator dataGen = null;
protected transient HoodieTableMetaClient metaClient;
@TempDir
public java.nio.file.Path tempDir;
+ protected void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
/**
* Initializes basePath.
*/
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 00133ab..a9d85af 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -154,14 +154,12 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
- val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
- val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
- if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
- HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
+ if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
+ HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
} else {
- HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
+ HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
}
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}
@@ -170,11 +168,9 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
- val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
- val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
new HoodieStreamingSink(
sqlContext,
- translatedOptions,
+ optParams,
partitionColumns,
outputMode)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e5c1a7a..1d0e8af 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -17,13 +17,13 @@
package org.apache.hudi
-
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.DataSourceOptionsHelper.{allAlternatives, translateConfigurations}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
@@ -31,12 +31,13 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
+import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
@@ -51,9 +52,9 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import java.util
import java.util.Properties
-import org.apache.hudi.index.SparkHoodieIndexFactory
-
import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ListBuffer
object HoodieSparkSqlWriter {
@@ -65,7 +66,7 @@ object HoodieSparkSqlWriter {
def write(sqlContext: SQLContext,
mode: SaveMode,
- parameters: Map[String, String],
+ optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
@@ -75,16 +76,23 @@ object HoodieSparkSqlWriter {
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
+ assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
+ val path = optParams("path")
+ val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
- val path = parameters.get("path")
- val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
- val tblNameOp = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
+ val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
+ tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+ var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
+ validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+
+ val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
+ val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
+ s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
+ assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
+ s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
+
asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
- if (path.isEmpty) {
- throw new HoodieException(s"'path' must be set.")
- }
- val tblName = tblNameOp.trim
sparkContext.getConf.getOption("spark.serializer") match {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
@@ -105,12 +113,8 @@ object HoodieSparkSqlWriter {
}
val jsc = new JavaSparkContext(sparkContext)
- val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
- val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
- tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
- var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
- val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(toProperties(parameters))
+ val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
@@ -124,7 +128,7 @@ object HoodieSparkSqlWriter {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
- val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean
+ val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
@@ -138,7 +142,9 @@ object HoodieSparkSqlWriter {
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_NAME))
- .initTable(sparkContext.hadoopConfiguration, path.get)
+ .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
+ .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
+ .initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
}
@@ -169,7 +175,7 @@ object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- null, path.get, tblName,
+ null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -200,7 +206,7 @@ object HoodieSparkSqlWriter {
}
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- null, path.get, tblName,
+ null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
// Issue delete partitions
@@ -244,7 +250,7 @@ object HoodieSparkSqlWriter {
val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
// Create a HoodieWriteClient & issue the write.
- val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path.get,
+ val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path,
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -326,14 +332,21 @@ object HoodieSparkSqlWriter {
def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
- parameters: Map[String, String],
+ optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
+ assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
+ val path = optParams("path")
+ val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
- val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
- val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
+ val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
+ tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+ var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
+ validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+
+ val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -349,10 +362,6 @@ object HoodieSparkSqlWriter {
schema = HoodieAvroUtils.getNullSchema.toString
}
- val basePath = new Path(path)
- val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
- tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
- val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
// Handle various save modes
if (mode == SaveMode.Ignore && tableExists) {
@@ -381,6 +390,8 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
+ .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
+ .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.initTable(sparkContext.hadoopConfiguration, path)
}
@@ -401,7 +412,7 @@ object HoodieSparkSqlWriter {
df: DataFrame,
tblName: String,
basePath: Path,
- path: Option[String],
+ path: String,
instantTime: String,
partitionColumns: String): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext
@@ -424,7 +435,7 @@ object HoodieSparkSqlWriter {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString)
- val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params))
+ val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
@@ -699,4 +710,49 @@ object HoodieSparkSqlWriter {
null
}
}
+
+ private def validateTableConfig(spark: SparkSession, params: Map[String, String],
+ tableConfig: HoodieTableConfig): Unit = {
+ val resolver = spark.sessionState.conf.resolver
+ val diffConfigs = StringBuilder.newBuilder
+ params.foreach { case (key, value) =>
+ val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
+ if (null != existingValue && !resolver(existingValue, value)) {
+ diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+ }
+ }
+ if (diffConfigs.nonEmpty) {
+ diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
+ throw new HoodieException(diffConfigs.toString.trim)
+ }
+ }
+
+ private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
+ tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
+ val mergedParams = mutable.Map.empty ++
+ DataSourceWriteOptions.translateSqlOptions(HoodieWriterUtils.parametersWithWriteDefaults(optParams))
+ if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
+ && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
+ mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
+ }
+ if (null != tableConfig) {
+ tableConfig.getProps.foreach { case (key, value) =>
+ mergedParams(key) = value
+ }
+ }
+ val params = mergedParams.toMap
+ (params, HoodieWriterUtils.convertMapToHoodieConfig(params))
+ }
+
+ private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieTableConfig, key: String): String = {
+ if (null == tableConfig) {
+ null
+ } else {
+ if (allAlternatives.contains(key)) {
+ tableConfig.getString(allAlternatives(key))
+ } else {
+ tableConfig.getString(key)
+ }
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index b1f8eb5..6e736d2 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -48,9 +48,12 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
- private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT.key).toInt
- private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key).toLong
- private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key).toBoolean
+ private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
+ DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
+ private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
+ DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
+ private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
+ DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
private var isAsyncCompactorServiceShutdownAbnormally = false
private var isAsyncClusteringServiceShutdownAbnormally = false
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index bdb2afb..0e3ede1 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -92,10 +92,9 @@ object HoodieWriterUtils {
* @return
*/
def getPartitionColumns(parameters: Map[String, String]): String = {
- val props = new TypedProperties()
+ val props = new Properties()
props.putAll(parameters.asJava)
- val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
- HoodieSparkUtils.getPartitionColumns(keyGen, props)
+ HoodieSparkUtils.getPartitionColumns(props)
}
def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 25d3026..963035c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -120,8 +120,13 @@ object HoodieOptionConfig {
*/
def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = {
defaultTableConfig ++
- options.filterKeys(k => keyTableConfigMapping.contains(k))
- .map(kv => keyTableConfigMapping(kv._1) -> valueMapping.getOrElse(kv._2, kv._2))
+ options.map { case (k, v) =>
+ if (keyTableConfigMapping.contains(k)) {
+ keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
+ } else {
+ k -> v
+ }
+ }
}
/**
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index ec1f746..8ac6312 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -41,8 +41,12 @@ import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOL
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
-
import java.util.{Locale, Properties}
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -90,35 +94,13 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
.setBasePath(path)
.setConf(conf)
.build()
- val tableSchema = getTableSqlSchema(metaClient)
-
- // Get options from the external table and append with the options in ddl.
- val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption(
- metaClient.getTableConfig.getProps.asScala.toMap)
-
- val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
- var upgrateConfig = Map.empty[String, String]
- // If this is a non-hive-styled partition table, disable the hive style config.
- // (By default this config is enable for spark sql)
- upgrateConfig = if (!isHiveStyledPartitioning(allPartitionPaths, table)) {
- upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false")
- } else {
- upgrateConfig
- }
- upgrateConfig = if (!isUrlEncodeEnabled(allPartitionPaths, table)) {
- upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false")
- } else {
- upgrateConfig
- }
+ val tableSchema = getTableSqlSchema(metaClient)
- // Use the origin keygen to generate record key to keep the rowkey consistent with the old table for spark sql.
- // See SqlKeyGenerator#getRecordKey for detail.
- upgrateConfig = if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
- upgrateConfig + (SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
- } else {
- upgrateConfig
- }
- val options = originTableConfig ++ upgrateConfig ++ table.storage.properties
+ // Get options from the external table and append with the options in ddl.
+ val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption(
+ metaClient.getTableConfig.getProps.asScala.toMap)
+ val extraConfig = extraTableConfig(sparkSession, isTableExists, originTableConfig)
+ val options = originTableConfig ++ table.storage.properties ++ extraConfig
val userSpecifiedSchema = table.schema
if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
@@ -137,7 +119,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
s". The associated location('$path') already exists.")
}
// Add the meta fields to the schema if this is a managed table or an empty external table.
- (addMetaFields(table.schema), table.storage.properties)
+ val options = table.storage.properties ++ extraTableConfig(sparkSession, false)
+ (addMetaFields(table.schema), options)
}
val tableType = HoodieOptionConfig.getTableType(table.storage.properties)
@@ -314,6 +297,43 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}
}
+
+ def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
+ originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
+ val extraConfig = mutable.Map.empty[String, String]
+ if (isTableExists) {
+ val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
+ if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
+ extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
+ originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
+ } else {
+ extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
+ String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table))
+ }
+ if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) {
+ extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
+ originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
+ } else {
+ extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
+ String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table))
+ }
+ } else {
+ extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true"
+ extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()
+ }
+
+ val primaryColumns = HoodieOptionConfig.getPrimaryColumns(originTableConfig ++ table.storage.properties)
+ if (primaryColumns.isEmpty) {
+ extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[UuidKeyGenerator].getCanonicalName
+ } else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
+ extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
+ HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
+ originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
+ } else {
+ extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
+ }
+ extraConfig.toMap
+ }
}
object CreateHoodieTableCommand extends Logging {
@@ -342,6 +362,9 @@ object CreateHoodieTableCommand extends Logging {
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key)
+ checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
+ checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
+ checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
// Save all the table config to the hoodie.properties.
val parameters = originTableConfig ++ tableOptions
val properties = new Properties()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index 4d6d0a2..987ce0e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _}
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.ddl.HiveSyncMode
@@ -58,7 +59,12 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val path = getTableLocation(targetTable, sparkSession)
-
+ val conf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(path)
+ .setConf(conf)
+ .build()
+ val tableConfig = metaClient.getTableConfig
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
assert(primaryColumns.nonEmpty,
@@ -66,13 +72,14 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
withSparkConf(sparkSession, targetTable.storage.properties) {
Map(
"path" -> path,
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
TBL_NAME.key -> tableId.table,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
+ KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
- HIVE_STYLE_PARTITIONING.key -> "true",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index e1c61ed..2b88373 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -21,12 +21,14 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.internal.Logging
@@ -90,7 +92,6 @@ object InsertIntoHoodieTableCommand extends Logging {
// for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
- val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
val conf = sparkSession.sessionState.conf
val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
// If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
@@ -100,7 +101,7 @@ object InsertIntoHoodieTableCommand extends Logging {
val inputDF = sparkSession.createDataFrame(
Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
val success =
- HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1
+ HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1
if (success) {
if (refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
@@ -197,19 +198,42 @@ object InsertIntoHoodieTableCommand extends Logging {
val parameters = withSparkConf(sparkSession, options)()
val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
-
- val partitionFields = table.partitionColumnNames.mkString(",")
- val path = getTableLocation(table, sparkSession)
-
- val tableSchema = table.schema
-
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options)
+ val partitionFields = table.partitionColumnNames.mkString(",")
- val keyGenClass = if (primaryColumns.nonEmpty) {
- classOf[SqlKeyGenerator].getCanonicalName
+ val path = getTableLocation(table, sparkSession)
+ val conf = sparkSession.sessionState.newHadoopConf()
+ val isTableExists = tableExistsInPath(path, conf)
+ val tableConfig = if (isTableExists) {
+ HoodieTableMetaClient.builder()
+ .setBasePath(path)
+ .setConf(conf)
+ .build()
+ .getTableConfig
} else {
- classOf[UuidKeyGenerator].getName
+ null
}
+ val hiveStylePartitioningEnable = if (null == tableConfig || null == tableConfig.getHiveStylePartitioningEnable) {
+ "true"
+ } else {
+ tableConfig.getHiveStylePartitioningEnable
+ }
+ val urlEncodePartitioning = if (null == tableConfig || null == tableConfig.getUrlEncodePartitoning) {
+ "false"
+ } else {
+ tableConfig.getUrlEncodePartitoning
+ }
+ val keyGeneratorClassName = if (null == tableConfig || null == tableConfig.getKeyGeneratorClassName) {
+ if (primaryColumns.nonEmpty) {
+ classOf[ComplexKeyGenerator].getCanonicalName
+ } else {
+ classOf[UuidKeyGenerator].getCanonicalName
+ }
+ } else {
+ tableConfig.getKeyGeneratorClassName
+ }
+
+ val tableSchema = table.schema
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key)
@@ -267,7 +291,9 @@ object InsertIntoHoodieTableCommand extends Logging {
TBL_NAME.key -> table.identifier.table,
PRECOMBINE_FIELD.key -> tableSchema.fields.last.name,
OPERATION.key -> operation,
- KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
+ HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+ KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
@@ -279,10 +305,8 @@ object InsertIntoHoodieTableCommand extends Logging {
HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
- HIVE_STYLE_PARTITIONING.key -> "true",
HIVE_PARTITION_FIELDS.key -> partitionFields,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
- URL_ENCODE_PARTITIONING.key -> "true",
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index b22c607..dd1be20 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -34,7 +35,6 @@ import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, SerDeUtils}
import org.apache.spark.sql.types.{BooleanType, StructType}
-
import java.util.Base64
/**
@@ -419,7 +419,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = getTableLocation(targetTable, sparkSession)
-
+ val conf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(path)
+ .setConf(conf)
+ .build()
+ val tableConfig = metaClient.getTableConfig
val options = targetTable.storage.properties
val definedPk = HoodieOptionConfig.getPrimaryColumns(options)
// TODO Currently the mergeEqualConditionKeys must be the same the primary key.
@@ -429,31 +434,30 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
}
// Enable the hive sync by default if spark have enable the hive metastore.
val enableHive = isEnableHive(sparkSession)
- HoodieWriterUtils.parametersWithWriteDefaults(
- withSparkConf(sparkSession, options) {
- Map(
- "path" -> path,
- RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
- TBL_NAME.key -> targetTableName,
- PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
- PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
- META_SYNC_ENABLED.key -> enableHive.toString,
- HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
- HIVE_USE_JDBC.key -> "false",
- HIVE_DATABASE.key -> targetTableDb,
- HIVE_TABLE.key -> targetTableName,
- HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
- HIVE_STYLE_PARTITIONING.key -> "true",
- HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
- HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
- URL_ENCODE_PARTITIONING.key -> "true", // enable the url decode for sql.
- HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
- HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
- HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
- SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
- )
- })
+ withSparkConf(sparkSession, options) {
+ Map(
+ "path" -> path,
+ RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
+ PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
+ TBL_NAME.key -> targetTableName,
+ PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
+ PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
+ KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
+ META_SYNC_ENABLED.key -> enableHive.toString,
+ HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+ HIVE_USE_JDBC.key -> "false",
+ HIVE_DATABASE.key -> targetTableDb,
+ HIVE_TABLE.key -> targetTableName,
+ HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+ HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
+ HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
+ HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
+ HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+ HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
+ SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
+ )
+ }
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index b59984a..e069df9 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,11 +18,13 @@
package org.apache.spark.sql.hudi.command
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
+
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
+import org.apache.hudi.keygen._
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
@@ -48,7 +50,8 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
val keyGenProps = new TypedProperties()
keyGenProps.putAll(props)
keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
- keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, beforeKeyGenClassName)
+ val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props)
+ keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
} else {
None
@@ -64,7 +67,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
override def getRecordKey(row: Row): String = {
- if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) {
+ if (originKeyGen.isDefined) {
originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
} else {
super.getRecordKey(row)
@@ -121,4 +124,13 @@ object SqlKeyGenerator {
val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
+
+ def getRealKeyGenClassName(props: TypedProperties): String = {
+ val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
+ if (beforeKeyGenClassName != null) {
+ HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
+ } else {
+ classOf[ComplexKeyGenerator].getCanonicalName
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 20a8274..b1c8a04 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -85,7 +86,12 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val path = getTableLocation(targetTable, sparkSession)
-
+ val conf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(path)
+ .setConf(conf)
+ .build()
+ val tableConfig = metaClient.getTableConfig
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
assert(primaryColumns.nonEmpty,
@@ -95,9 +101,11 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
Map(
"path" -> path,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
PRECOMBINE_FIELD.key -> primaryColumns.head, //set the default preCombine field.
TBL_NAME.key -> tableId.table,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
+ KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
META_SYNC_ENABLED.key -> enableHive.toString,
@@ -107,9 +115,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
HIVE_TABLE.key -> tableId.table,
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
- URL_ENCODE_PARTITIONING.key -> "true",
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
- HIVE_STYLE_PARTITIONING.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
index ff95e87..96fb18d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.hive.HiveSyncConfig
-import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
+import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
@@ -48,8 +48,10 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte
import java.time.Instant
import java.util
import java.util.{Collections, Date, UUID}
+
import scala.collection.JavaConversions._
import scala.collection.JavaConverters
+import scala.util.control.NonFatal
/**
* Test suite for SparkSqlWriter class.
@@ -161,7 +163,6 @@ class HoodieSparkSqlWriterSuite {
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields))
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name())
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -175,7 +176,7 @@ class HoodieSparkSqlWriterSuite {
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
// collect all partition paths to issue read of parquet files
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
@@ -242,21 +243,19 @@ class HoodieSparkSqlWriterSuite {
//create a new table
val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, dataFrame)
//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
val barTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
- val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
- val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2))
+ val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
//on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
- val deleteTableParams = barTableParams ++ Map(OPERATION.key -> "delete")
- val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2))
+ val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete")
+ val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2))
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
}
@@ -295,7 +294,6 @@ class HoodieSparkSqlWriterSuite {
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -304,7 +302,7 @@ class HoodieSparkSqlWriterSuite {
val df = spark.createDataFrame(sc.parallelize(inserts), structType)
try {
// write to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
Assertions.fail("Should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"))
@@ -323,7 +321,6 @@ class HoodieSparkSqlWriterSuite {
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(INSERT_DROP_DUPS.key, "true")
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -332,7 +329,7 @@ class HoodieSparkSqlWriterSuite {
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
// write to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
fail("Drop duplicates with bulk insert in row writing should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.contains("Dropping duplicates with bulk_insert in row writer path is not supported yet"))
@@ -348,7 +345,6 @@ class HoodieSparkSqlWriterSuite {
//create a new table
val fooTableModifier = commonTableModifier.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false")
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -357,7 +353,7 @@ class HoodieSparkSqlWriterSuite {
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df)
// collect all partition paths to issue read of parquet files
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
@@ -384,7 +380,6 @@ class HoodieSparkSqlWriterSuite {
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
val fullPartitionPaths = new Array[String](3)
@@ -400,7 +395,7 @@ class HoodieSparkSqlWriterSuite {
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
// Fetch records from entire dataset
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be compared as is
@@ -450,7 +445,7 @@ class HoodieSparkSqlWriterSuite {
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, Option(client))
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df, Option.empty, Option(client))
// Verify that asynchronous compaction is not scheduled
verify(client, times(0)).scheduleCompaction(any())
// Verify that HoodieWriteClient is closed correctly
@@ -504,14 +499,14 @@ class HoodieSparkSqlWriterSuite {
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
initializeMetaClientForBootstrap(fooTableParams, tableType, true)
- val client = spy(DataSourceUtils.createHoodieClient(
+ val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
null,
tempBasePath,
hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
- HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableParams, spark.emptyDataFrame, Option.empty,
+ HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableModifier, spark.emptyDataFrame, Option.empty,
Option(client))
// Verify that HoodieWriteClient is closed correctly
@@ -556,7 +551,6 @@ class HoodieSparkSqlWriterSuite {
//create a new table
val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, tableType)
.updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "true")
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -564,7 +558,7 @@ class HoodieSparkSqlWriterSuite {
var records = DataSourceTestUtils.generateRandomRows(10)
var recordsSeq = convertRowListToSeq(records)
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
@@ -577,7 +571,7 @@ class HoodieSparkSqlWriterSuite {
// issue updates so that log files are created for MOR table
val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
@@ -595,7 +589,7 @@ class HoodieSparkSqlWriterSuite {
recordsSeq = convertRowListToSeq(records)
val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), evolStructType)
// write to Hudi with new column
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df3)
val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
@@ -610,7 +604,7 @@ class HoodieSparkSqlWriterSuite {
records = DataSourceTestUtils.generateRandomRows(10)
recordsSeq = convertRowListToSeq(records)
val df4 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df4)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df4)
val snapshotDF4 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
@@ -743,14 +737,13 @@ class HoodieSparkSqlWriterSuite {
@ValueSource(booleans = Array(true, false))
def testDeletePartitionsV2(usePartitionsToDeleteConfig: Boolean): Unit = {
val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
- val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val records = DataSourceTestUtils.generateRandomRows(10)
val recordsSeq = convertRowListToSeq(records)
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
assertEquals(10, snapshotDF1.count())
@@ -761,7 +754,7 @@ class HoodieSparkSqlWriterSuite {
val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
// write updates to Hudi
- HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
assertEquals(10, snapshotDF2.count())
@@ -770,7 +763,7 @@ class HoodieSparkSqlWriterSuite {
// ensure 2nd batch of updates matches.
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
if (usePartitionsToDeleteConfig) {
- fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ fooTableModifier.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
}
// delete partitions contains the primary key
val recordsToDelete = df1.filter(entry => {
@@ -778,7 +771,7 @@ class HoodieSparkSqlWriterSuite {
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) ||
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
})
- val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
+ val updatedParams = fooTableModifier.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
@@ -819,4 +812,88 @@ class HoodieSparkSqlWriterSuite {
assert(spark.read.format("hudi").load(tempBasePath).where("age >= 2000").count() == 10)
}
}
+
+ /**
+ * Test case for no need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator included in HoodieTableConfig except the first time write
+ */
+ @Test
+ def testToWriteWithoutParametersIncludedInHoodieTableConfig(): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+ val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
+ val options = Map(
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
+ )
+
+ // case 1: test table which created by sql
+ val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")
+ spark.sql(
+ s"""
+ | create table $tableName1 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ | ) using hudi
+ | partitioned by (dt)
+ | options (
+ | primaryKey = 'id'
+ | )
+ | location '$tablePath1'
+ """.stripMargin)
+ val tableConfig1 = HoodieTableMetaClient.builder()
+ .setConf(spark.sparkContext.hadoopConfiguration)
+ .setBasePath(tablePath1).build().getTableConfig
+ assert(tableConfig1.getHiveStylePartitioningEnable == "true")
+ assert(tableConfig1.getUrlEncodePartitoning == "false")
+ assert(tableConfig1.getKeyGeneratorClassName == classOf[ComplexKeyGenerator].getName)
+ df.write.format("hudi")
+ .options(options)
+ .option(HoodieWriteConfig.TBL_NAME.key, tableName1)
+ .mode(SaveMode.Append).save(tablePath1)
+ assert(spark.read.format("hudi").load(tablePath1 + "/*").count() == 1)
+
+ // case 2: test table which created by dataframe
+ val (tableName2, tablePath2) = ("hoodie_test_params_2", s"$tempBasePath" + "_2")
+ // the first write need to specify params
+ df.write.format("hudi")
+ .options(options)
+ .option(HoodieWriteConfig.TBL_NAME.key, tableName2)
+ .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, "true")
+ .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
+ .mode(SaveMode.Overwrite).save(tablePath2)
+ val tableConfig2 = HoodieTableMetaClient.builder()
+ .setConf(spark.sparkContext.hadoopConfiguration)
+ .setBasePath(tablePath2).build().getTableConfig
+ assert(tableConfig2.getHiveStylePartitioningEnable == "false")
+ assert(tableConfig2.getUrlEncodePartitoning == "true")
+ assert(tableConfig2.getKeyGeneratorClassName == classOf[SimpleKeyGenerator].getName)
+
+ val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
+ // raise exception when use params which is not same with HoodieTableConfig
+ try {
+ df2.write.format("hudi")
+ .options(options)
+ .option(HoodieWriteConfig.TBL_NAME.key, tableName2)
+ .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
+ .mode(SaveMode.Append).save(tablePath2)
+ } catch {
+ case NonFatal(e) =>
+ assert(e.getMessage.contains("Config conflict"))
+ assert(e.getMessage.contains(
+ s"${HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key}\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
+ }
+
+ // do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params
+ df2.write.format("hudi")
+ .options(options)
+ .option(HoodieWriteConfig.TBL_NAME.key, tableName2)
+ .mode(SaveMode.Append).save(tablePath2)
+ val data = spark.read.format("hudi").load(tablePath2 + "/*")
+ assert(data.count() == 2)
+ assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "dt=2021-10-16")
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 94e9620..7c58cc0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -17,6 +17,8 @@
package org.apache.hudi
+import java.util.Properties
+
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -58,6 +60,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
)
@BeforeEach override def setUp() {
+ setTableName("hoodie_test")
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
@@ -71,6 +74,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testPartitionSchema(partitionEncode: Boolean): Unit = {
+ val props = new Properties()
+ props.setProperty(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, String.valueOf(partitionEncode))
+ initMetaClient(props)
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
inputDF1.write.format("hudi")
@@ -128,6 +134,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testPartitionPruneWithPartitionEncode(partitionEncode: Boolean): Unit = {
+ val props = new Properties()
+ props.setProperty(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, String.valueOf(partitionEncode))
+ initMetaClient(props)
val partitions = Array("2021/03/08", "2021/03/09", "2021/03/10", "2021/03/11", "2021/03/12")
val newDataGen = new HoodieTestDataGenerator(partitions)
val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 8fc6e7f..d6ae80d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -154,7 +154,9 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
- DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
+ DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+ Some("datestr"),
+ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -473,11 +475,13 @@ class TestDataSourceForBootstrap {
}
def runMetadataBootstrapAndVerifyCommit(tableType: String,
- partitionColumns: Option[String] = None): String = {
+ partitionColumns: Option[String] = None,
+ extraOpts: Map[String, String] = Map.empty): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
+ .options(extraOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index ee914ae..eba2a3d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -17,6 +17,8 @@
package org.apache.hudi.functional
+import java.util.Properties
+
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -61,6 +63,7 @@ class TestMORDataSource extends HoodieClientTestBase {
val updatedVerificationVal: String = "driver_update"
@BeforeEach override def setUp() {
+ setTableName("hoodie_test")
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index bb102a4..9482ae3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -46,6 +46,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
)
@BeforeEach override def setUp() {
+ setTableName("hoodie_test")
initPath()
initSparkContexts()
spark = sqlContext.sparkSession