You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/04/01 04:21:39 UTC
[hudi] branch master updated: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty (#8152)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6fd885fb3dc [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty (#8152)
6fd885fb3dc is described below
commit 6fd885fb3dc5c66caf6a1775fa87b4f7212056d8
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Sat Apr 1 09:51:28 2023 +0530
[HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty (#8152)
This commit moves the configs for Deltastreamer and schema providers to config classes extending `HoodieConfig` and using `ConfigProperty`, so that when we generate the documentation of configurations for Hudi website, these configs are surfaced. Configs in the following classes are in scope:
```
InitialCheckPointProvider
HoodieDeltaStreamer
HoodieMultiTableDeltaStreamer
FilebasedSchemaProvider
HiveSchemaProvider
JdbcbasedSchemaProvider
ProtoClassBasedSchemaProvider
SchemaPostProcessor
SchemaRegistryProvider
SparkAvroPostProcessor
DropColumnSchemaPostProcessor
BaseSchemaPostProcessorConfig
KafkaOffsetPostProcessor
SanitizationUtils
'hoodie.deltastreamer.multiwriter.source.checkpoint.id' in HoodieWriteConfig
```
Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 7 --
.../apache/hudi/common/config/ConfigGroups.java | 5 ++
.../testsuite/HoodieInlineTestSuiteWriter.java | 5 +-
.../org/apache/hudi/utilities/UtilHelpers.java | 7 +-
.../checkpointing/InitialCheckPointProvider.java | 18 ++---
.../config/FilebasedSchemaProviderConfig.java | 49 ++++++++++++
.../utilities/config/HiveSchemaProviderConfig.java | 59 ++++++++++++++
.../config/HoodieDeltaStreamerConfig.java | 89 ++++++++++++++++++++++
.../config/HoodieSchemaProviderConfig.java | 82 ++++++++++++++++++++
.../config/JdbcbasedSchemaProviderConfig.java | 77 +++++++++++++++++++
.../ProtoClassBasedSchemaProviderConfig.java | 69 +++++++++++++++++
.../config/SchemaProviderPostProcessorConfig.java | 76 ++++++++++++++++++
.../hudi/utilities/deltastreamer/DeltaSync.java | 6 +-
.../HoodieMultiTableDeltaStreamer.java | 57 +++++++-------
.../deltastreamer/SourceFormatAdapter.java | 4 +-
.../utilities/schema/FilebasedSchemaProvider.java | 17 ++---
.../hudi/utilities/schema/HiveSchemaProvider.java | 23 ++----
.../utilities/schema/JdbcbasedSchemaProvider.java | 36 ++-------
.../utilities/schema/KafkaOffsetPostProcessor.java | 12 ++-
.../schema/ProtoClassBasedSchemaProvider.java | 52 ++++++-------
.../hudi/utilities/schema/SchemaPostProcessor.java | 9 ++-
.../utilities/schema/SchemaRegistryProvider.java | 25 +++---
.../utilities/schema/SparkAvroPostProcessor.java | 5 +-
.../DropColumnSchemaPostProcessor.java | 12 ++-
.../add/AddPrimitiveColumnSchemaPostProcessor.java | 15 ++--
.../add/BaseSchemaPostProcessorConfig.java | 53 -------------
.../hudi/utilities/sources/ProtoKafkaSource.java | 6 +-
.../utilities/sources/debezium/DebeziumSource.java | 3 +-
.../utilities/sources/helpers/AvroConvertor.java | 4 +-
.../sources/helpers/SanitizationUtils.java | 24 +++---
.../hudi/utilities/TestSchemaPostProcessor.java | 27 ++++---
.../deltastreamer/TestHoodieDeltaStreamer.java | 14 ++--
.../TestHoodieMultiTableDeltaStreamer.java | 8 +-
.../deltastreamer/TestSourceFormatAdapter.java | 10 +--
.../schema/TestFilebasedSchemaProvider.java | 4 +-
.../schema/TestProtoClassBasedSchemaProvider.java | 17 +++--
.../utilities/sources/TestAvroKafkaSource.java | 6 +-
.../utilities/sources/TestJsonKafkaSource.java | 4 +-
.../utilities/sources/TestProtoKafkaSource.java | 5 +-
.../utilities/testutils/SanitizationTestUtils.java | 2 +-
40 files changed, 708 insertions(+), 295 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5887604d3ad..2ace2277dd1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -656,13 +656,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to enable commit conflict checking or not during early "
+ "conflict detection.");
- public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
- .key("hoodie.deltastreamer.multiwriter.source.checkpoint.id")
- .noDefaultValue()
- .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
- + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
- + "a single deltastreamer for a table then you do not need to set this config.");
-
public static final ConfigProperty<String> SENSITIVE_CONFIG_KEYS_FILTER = ConfigProperty
.key("hoodie.sensitive.config.keys")
.defaultValue("ssl,tls,sasl,auth,credentials")
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
index bd4589b82b1..234a0a5a45a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
@@ -61,6 +61,11 @@ public class ConfigGroups {
"Commit Callback Configs",
"Configurations controlling callback behavior into HTTP endpoints, to push "
+ "notifications on commits on hudi tables."),
+
+ SCHEMA_PROVIDER(
+ "DeltaStreamer Schema Provider Configs",
+ "Configurations that control the schema provider for DeltaStreamer."),
+
NONE(
"None",
"No subgroup. This description should be hidden.");
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
index 46f793ef12b..1bfa6fb98fb 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
@@ -35,6 +35,7 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.avro.Schema;
@@ -196,7 +197,7 @@ public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
- extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
+ extraMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
@@ -211,7 +212,7 @@ public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
- extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
+ extraMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 66c67d105f5..fef5c68109a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -45,6 +45,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -52,7 +54,6 @@ import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
-import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
@@ -497,8 +498,8 @@ public class UtilHelpers {
return provider;
}
- String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null);
- boolean enableSparkAvroPostProcessor = Boolean.parseBoolean(cfg.getString(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE, "true"));
+ String schemaPostProcessorClass = cfg.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), null);
+ boolean enableSparkAvroPostProcessor = Boolean.parseBoolean(cfg.getString(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key(), "true"));
if (transformerClassNames != null && !transformerClassNames.isEmpty()
&& enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
index 4b9039e4308..bfcd7d19486 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
@@ -18,16 +18,18 @@
package org.apache.hudi.utilities.checkpointing;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
/**
* Provide the initial checkpoint for delta streamer.
@@ -38,17 +40,13 @@ public abstract class InitialCheckPointProvider {
protected transient FileSystem fs;
protected transient TypedProperties props;
- static class Config {
- private static String CHECKPOINT_PROVIDER_PATH_PROP = "hoodie.deltastreamer.checkpoint.provider.path";
- }
-
/**
* Construct InitialCheckPointProvider.
* @param props All properties passed to Delta Streamer
*/
public InitialCheckPointProvider(TypedProperties props) {
this.props = props;
- this.path = new Path(props.getString(Config.CHECKPOINT_PROVIDER_PATH_PROP));
+ this.path = new Path(props.getString(HoodieDeltaStreamerConfig.CHECKPOINT_PROVIDER_PATH.key()));
}
/**
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/FilebasedSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/FilebasedSchemaProviderConfig.java
new file mode 100644
index 00000000000..2c2379c40e1
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/FilebasedSchemaProviderConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * File-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "File-based Schema Provider Configs",
+ groupName = ConfigGroups.Names.DELTA_STREAMER,
+ subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+ description = "Configurations for file-based schema provider.")
+public class FilebasedSchemaProviderConfig extends HoodieConfig {
+ public static final ConfigProperty<String> SOURCE_SCHEMA_FILE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+ .noDefaultValue()
+ .withDocumentation("The schema of the source you are reading from");
+
+ public static final ConfigProperty<String> TARGET_SCHEMA_FILE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+ .noDefaultValue()
+ .withDocumentation("The schema of the target you are writing to");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HiveSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HiveSchemaProviderConfig.java
new file mode 100644
index 00000000000..5d33973b59b
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HiveSchemaProviderConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * Hive Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "Hive Schema Provider Configs",
+ groupName = ConfigGroups.Names.DELTA_STREAMER,
+ subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+ description = "Configurations for Hive schema provider.")
+public class HiveSchemaProviderConfig extends HoodieConfig {
+ public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.database")
+ .noDefaultValue()
+ .withDocumentation("Hive database from where source schema can be fetched");
+
+ public static final ConfigProperty<String> SOURCE_SCHEMA_TABLE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.table")
+ .noDefaultValue()
+ .withDocumentation("Hive table from where source schema can be fetched");
+
+ public static final ConfigProperty<String> TARGET_SCHEMA_DATABASE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.database")
+ .noDefaultValue()
+ .withDocumentation("Hive database from where target schema can be fetched");
+
+ public static final ConfigProperty<String> TARGET_SCHEMA_TABLE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.table")
+ .noDefaultValue()
+ .withDocumentation("Hive table from where target schema can be fetched");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
new file mode 100644
index 00000000000..71cd77f9a98
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+ groupName = ConfigGroups.Names.DELTA_STREAMER,
+ description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+ public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+ public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+ public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+ .noDefaultValue()
+ .withDocumentation("The path for providing the checkpoints.");
+
+ public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+ .noDefaultValue()
+ .withDocumentation("Kafka topic name.");
+
+ public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+ .defaultValue("false")
+ .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+ + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+ + "By default its disabled and no kafka offsets are added");
+
+ public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+ .defaultValue(false)
+ .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+ + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+ + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+ public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+ .defaultValue("__")
+ .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+ + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+ public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+ .noDefaultValue()
+ .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+ + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+ + "a single deltastreamer for a table then you do not need to set this config.");
+
+ public static final ConfigProperty<String> TABLES_TO_BE_INGESTED = ConfigProperty
+ .key(INGESTION_PREFIX + "tablesToBeIngested")
+ .noDefaultValue()
+ .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+ public static final ConfigProperty<String> TARGET_BASE_PATH = ConfigProperty
+ .key(INGESTION_PREFIX + "targetBasePath")
+ .noDefaultValue()
+ .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+ + " and overrides path determined using option `--base-path-prefix` for a table. This config is ignored for a single"
+ + " table deltastreamer");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java
new file mode 100644
index 00000000000..6a10905e17e
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.DELTA_STREAMER_CONFIG_PREFIX;
+
+/**
+ * Delta Streamer Schema Provider related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+ groupName = ConfigGroups.Names.DELTA_STREAMER,
+ subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+ areCommonConfigs = true,
+ description = "")
+public class HoodieSchemaProviderConfig extends HoodieConfig {
+ public static final String SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+ public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+ .noDefaultValue()
+ .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+ public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+ .noDefaultValue()
+ .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+ public static final ConfigProperty<String> SCHEMA_CONVERTER = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+ .noDefaultValue()
+ .withDocumentation("The class name of the custom schema converter to use.");
+
+ public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_ENABLE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+ .defaultValue(false)
+ .withDocumentation("Whether to enable Spark Avro post processor.");
+
+ public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+ .noDefaultValue()
+ .withDocumentation("The base URL of the schema registry.");
+
+ public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+ .noDefaultValue()
+ .withDocumentation("The suffix of the URL for the schema registry.");
+
+ public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+ .noDefaultValue()
+ .withDocumentation("The source URL suffix.");
+
+ public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+ .noDefaultValue()
+ .withDocumentation("The target URL suffix.");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/JdbcbasedSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/JdbcbasedSchemaProviderConfig.java
new file mode 100644
index 00000000000..dadf6757bd9
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/JdbcbasedSchemaProviderConfig.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+ groupName = ConfigGroups.Names.DELTA_STREAMER,
+ subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+ description = "Configurations for JDBC-based schema provider.")
+public class JdbcbasedSchemaProviderConfig extends HoodieConfig {
+ public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_CONNECTION_URL = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.connection.url")
+ .noDefaultValue()
+ .withDocumentation("The JDBC URL to connect to. The source-specific connection properties may be specified in the URL."
+ + " e.g., jdbc:postgresql://localhost/test?user=fred&password=secret");
+
+ public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DRIVER_TYPE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.driver.type")
+ .noDefaultValue()
+ .withDocumentation("The class name of the JDBC driver to use to connect to this URL. e.g. org.h2.Driver");
+
+ public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_USERNAME = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.username")
+ .noDefaultValue()
+ .withDocumentation("Username for the connection e.g. fred");
+
+ public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_PASSWORD = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.password")
+ .noDefaultValue()
+ .withDocumentation("Password for the connection e.g. secret");
+
+ public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DBTABLE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.dbtable")
+ .noDefaultValue()
+ .withDocumentation("The table with the schema to reference e.g. test_database.test1_table or test1_table");
+
+ public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_TIMEOUT = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.timeout")
+ .noDefaultValue()
+ .withDocumentation("The number of seconds the driver will wait for a Statement object to execute. Zero means there is no limit. "
+ + "In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver "
+ + "checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.");
+
+ public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_NULLABLE = ConfigProperty
+ .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.nullable")
+ .noDefaultValue()
+ .withDocumentation("If true, all the columns are nullable.");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java
new file mode 100644
index 00000000000..8e62de8f1c0
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+ groupName = ConfigGroups.Names.DELTA_STREAMER,
+ subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+ description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+ private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto.";
+
+ public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty
+ .key(PROTO_SCHEMA_PROVIDER_PREFIX + "class.name")
+ .noDefaultValue()
+ .sinceVersion("0.13.0")
+ .withDocumentation("The Protobuf Message class used as the source for the schema.");
+
+ public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = ConfigProperty
+ .key(PROTO_SCHEMA_PROVIDER_PREFIX + "flatten.wrappers")
+ .defaultValue(false)
+ .sinceVersion("0.13.0")
+ .withDocumentation("When set to true wrapped primitives like Int64Value are translated to a record with a single 'value' field. By default, the value is false and the wrapped primitives are "
+ + "treated as a nullable value");
+
+ public static final ConfigProperty<Boolean> PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS = ConfigProperty
+ .key(PROTO_SCHEMA_PROVIDER_PREFIX + "timestamps.as.records")
+ .defaultValue(false)
+ .sinceVersion("0.13.0")
+ .withDocumentation("When set to true Timestamp fields are translated to a record with a seconds and nanos field. By default, the value is false and the timestamp is converted to a long with "
+ + "the timestamp-micros logical type");
+
+ public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH = ConfigProperty
+ .key(PROTO_SCHEMA_PROVIDER_PREFIX + "max.recursion.depth")
+ .defaultValue(5)
+ .sinceVersion("0.13.0")
+ .withDocumentation("The max depth to unravel the Proto schema when translating into an Avro schema. Setting this depth allows the user to convert a schema that is recursive in proto into "
+ + "something that can be represented in their lake format like Parquet. After a given class has been seen N times within a single branch, the schema provider will create a record with a "
+ + "byte array to hold the remaining proto data and a string to hold the message descriptor's name for context.");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/SchemaProviderPostProcessorConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/SchemaProviderPostProcessorConfig.java
new file mode 100644
index 00000000000..c2ce82c1735
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/SchemaProviderPostProcessorConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Configurations for Schema Post Processor.
+ */
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+ groupName = ConfigGroups.Names.DELTA_STREAMER,
+ subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+ description = "Configurations for Schema Post Processor")
+public class SchemaProviderPostProcessorConfig extends HoodieConfig {
+
+ private static final String PREFIX = HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.";
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR = ConfigProperty
+ .key(HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+ .noDefaultValue()
+ .withDocumentation("The class name of the schema post processor.");
+
+ public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN = ConfigProperty
+ .key(PREFIX + "delete.columns")
+ .noDefaultValue()
+ .withDocumentation("Columns to delete in the schema post processor.");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+ .key(PREFIX + "add.column.name")
+ .noDefaultValue()
+ .withDocumentation("New column's name");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+ .key(PREFIX + "add.column.type")
+ .noDefaultValue()
+ .withDocumentation("New column's type");
+
+ public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+ .key(PREFIX + "add.column.nullable")
+ .defaultValue(true)
+ .withDocumentation("New column's nullable");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+ .key(PREFIX + "add.column.default")
+ .noDefaultValue()
+ .withDocumentation("New column's default value");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+ .key(PREFIX + "add.column.doc")
+ .noDefaultValue()
+ .withDocumentation("Docs about new column");
+
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 1f79564a7f4..610c215d227 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -144,14 +144,14 @@ import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_ENABLED;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
-import static org.apache.hudi.config.HoodieWriteConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
-import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
-import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DEFAULT_CHECKPOINT_FORCE_SKIP_PROP;
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 3532c347050..71018feb18f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -18,27 +18,29 @@
package org.apache.hudi.utilities.deltastreamer;
-import com.beust.jcommander.Parameter;
import org.apache.hudi.client.utils.OperationConverter;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.IdentitySplitter;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
+import org.apache.hudi.utilities.sources.JsonDFSSource;
import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -52,9 +54,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
-import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP;
-import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.TARGET_SCHEMA_REGISTRY_URL_PROP;
-
/**
* Wrapper over HoodieDeltaStreamer.java class.
* Helps with ingesting incremental data into hoodie datasets for multiple tables.
@@ -117,7 +116,7 @@ public class HoodieMultiTableDeltaStreamer {
String[] tableWithDatabase = table.split("\\.");
String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
- String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
+ String configProp = HoodieDeltaStreamerConfig.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
checkIfTableConfigFileExists(configFolder, fs, configFilePath);
TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
@@ -130,7 +129,7 @@ public class HoodieMultiTableDeltaStreamer {
//copy all the values from config to cfg
String targetBasePath = resetTarget(config, database, currentTable);
Helpers.deepCopyConfigs(config, cfg);
- String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
+ String overriddenTargetBasePath = tableProperties.getString(HoodieDeltaStreamerConfig.TARGET_BASE_PATH.key(), "");
cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), ""))) {
throw new HoodieException("Meta sync table field not provided!");
@@ -146,7 +145,7 @@ public class HoodieMultiTableDeltaStreamer {
}
private List<String> getTablesToBeIngested(TypedProperties properties) {
- String combinedTablesString = properties.getString(Constants.TABLES_TO_BE_INGESTED_PROP);
+ String combinedTablesString = properties.getString(HoodieDeltaStreamerConfig.TABLES_TO_BE_INGESTED.key());
if (combinedTablesString == null) {
return new ArrayList<>();
}
@@ -162,32 +161,34 @@ public class HoodieMultiTableDeltaStreamer {
}
private void populateTargetRegistryProp(TypedProperties typedProperties) {
- String schemaRegistryTargetUrl = typedProperties.getString(TARGET_SCHEMA_REGISTRY_URL_PROP, null);
+ String schemaRegistryTargetUrl = typedProperties.getString(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(), null);
if (StringUtils.isNullOrEmpty(schemaRegistryTargetUrl)) {
- String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
- String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
+ String schemaRegistryBaseUrl = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_BASE_URL.key());
+ String schemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_URL_SUFFIX.key(), null);
String targetSchemaRegistrySuffix;
if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
- targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX);
+ targetSchemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_TARGET_URL_SUFFIX.key());
} else {
targetSchemaRegistrySuffix = schemaRegistrySuffix;
}
- typedProperties.setProperty(TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix);
+ typedProperties.setProperty(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(),
+ schemaRegistryBaseUrl + typedProperties.getString(HoodieDeltaStreamerConfig.KAFKA_TOPIC.key()) + targetSchemaRegistrySuffix);
}
}
private void populateSourceRegistryProp(TypedProperties typedProperties) {
- String schemaRegistrySourceUrl = typedProperties.getString(SRC_SCHEMA_REGISTRY_URL_PROP, null);
+ String schemaRegistrySourceUrl = typedProperties.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key(), null);
if (StringUtils.isNullOrEmpty(schemaRegistrySourceUrl)) {
- String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
- String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
+ String schemaRegistryBaseUrl = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_BASE_URL.key());
+ String schemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_URL_SUFFIX.key(), null);
String sourceSchemaRegistrySuffix;
if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
- sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
+ sourceSchemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX.key());
} else {
sourceSchemaRegistrySuffix = schemaRegistrySuffix;
}
- typedProperties.setProperty(SRC_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix);
+ typedProperties.setProperty(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key(),
+ schemaRegistryBaseUrl + typedProperties.getString(HoodieDeltaStreamerConfig.KAFKA_TOPIC.key()) + sourceSchemaRegistrySuffix);
}
}
@@ -247,7 +248,7 @@ public class HoodieMultiTableDeltaStreamer {
if (config.targetTableName != null) {
logger.warn(String.format("--target-table is deprecated and will be removed in a future release due to it's useless;"
- + " please use %s to configure multiple target tables", Constants.TABLES_TO_BE_INGESTED_PROP));
+ + " please use %s to configure multiple target tables", HoodieDeltaStreamerConfig.TABLES_TO_BE_INGESTED.key()));
}
JavaSparkContext jssc = UtilHelpers.buildSparkContext("multi-table-delta-streamer", Constants.LOCAL_SPARK_MASTER);
@@ -441,18 +442,12 @@ public class HoodieMultiTableDeltaStreamer {
}
}
- public static class Constants {
- public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
- public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
- private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
- private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
- private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
- private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
- private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
- private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
+ static class Constants {
+ @Deprecated
+ private static final String KAFKA_TOPIC_PROP = HoodieDeltaStreamerConfig.KAFKA_TOPIC.key();
+ static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
- private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
private static final String LOCAL_SPARK_MASTER = "local[2]";
private static final String FILE_DELIMITER = "/";
private static final String DELIMITER = ".";
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index afc9c3416fc..26512a405a5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -57,8 +57,8 @@ import scala.util.Either;
import static org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
/**
* Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer).
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index fe8ed3432d7..9956e1528a4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.config.FilebasedSchemaProviderConfig;
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import org.apache.avro.Schema;
@@ -39,14 +40,6 @@ import java.util.Collections;
*/
public class FilebasedSchemaProvider extends SchemaProvider {
- /**
- * Configs supported.
- */
- public static class Config {
- private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.file";
- private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file";
- }
-
private final FileSystem fs;
protected Schema sourceSchema;
@@ -55,14 +48,14 @@ public class FilebasedSchemaProvider extends SchemaProvider {
public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
- DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
- String sourceFile = props.getString(Config.SOURCE_SCHEMA_FILE_PROP);
+ DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE.key()));
+ String sourceFile = props.getString(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE.key());
boolean shouldSanitize = SanitizationUtils.getShouldSanitize(props);
String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs, shouldSanitize, invalidCharMask);
- if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
- this.targetSchema = readAvroSchemaFromFile(props.getString(Config.TARGET_SCHEMA_FILE_PROP), this.fs, shouldSanitize, invalidCharMask);
+ if (props.containsKey(FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE.key())) {
+ this.targetSchema = readAvroSchemaFromFile(props.getString(FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE.key()), this.fs, shouldSanitize, invalidCharMask);
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index 9fca2a241a6..e6b113b2782 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
import org.apache.avro.Schema;
@@ -39,24 +40,14 @@ import java.util.Collections;
*/
public class HiveSchemaProvider extends SchemaProvider {
- /**
- * Configs supported.
- */
- public static class Config {
- private static final String SOURCE_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.database";
- private static final String SOURCE_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.table";
- private static final String TARGET_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.database";
- private static final String TARGET_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.table";
- }
-
private final Schema sourceSchema;
private Schema targetSchema;
public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
- DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_TABLE_PROP));
- String sourceSchemaDatabaseName = props.getString(Config.SOURCE_SCHEMA_DATABASE_PROP, "default");
- String sourceSchemaTableName = props.getString(Config.SOURCE_SCHEMA_TABLE_PROP);
+ DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HiveSchemaProviderConfig.SOURCE_SCHEMA_TABLE.key()));
+ String sourceSchemaDatabaseName = props.getString(HiveSchemaProviderConfig.SOURCE_SCHEMA_DATABASE.key(), "default");
+ String sourceSchemaTableName = props.getString(HiveSchemaProviderConfig.SOURCE_SCHEMA_TABLE.key());
SparkSession spark = SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate();
// source schema
@@ -72,9 +63,9 @@ public class HiveSchemaProvider extends SchemaProvider {
}
// target schema
- if (props.containsKey(Config.TARGET_SCHEMA_TABLE_PROP)) {
- String targetSchemaDatabaseName = props.getString(Config.TARGET_SCHEMA_DATABASE_PROP, "default");
- String targetSchemaTableName = props.getString(Config.TARGET_SCHEMA_TABLE_PROP);
+ if (props.containsKey(HiveSchemaProviderConfig.TARGET_SCHEMA_TABLE.key())) {
+ String targetSchemaDatabaseName = props.getString(HiveSchemaProviderConfig.TARGET_SCHEMA_DATABASE.key(), "default");
+ String targetSchemaTableName = props.getString(HiveSchemaProviderConfig.TARGET_SCHEMA_TABLE.key());
try {
TableIdentifier targetSchemaTable = new TableIdentifier(targetSchemaTableName, scala.Option.apply(targetSchemaDatabaseName));
StructType targetSchema = spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
index 2a3c2a8b487..c548db3650f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.JdbcbasedSchemaProviderConfig;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
@@ -35,38 +36,17 @@ public class JdbcbasedSchemaProvider extends SchemaProvider {
private Schema sourceSchema;
private Map<String, String> options = new HashMap<>();
- /**
- * Configs supported.
- */
- public static class Config {
- // The JDBC URL to connect to. The source-specific connection properties may be specified in the URL.
- // e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
- private static final String SOURCE_SCHEMA_JDBC_CONNECTION_URL = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url";
- // The class name of the JDBC driver to use to connect to this URL. such as org.h2.Driver
- private static final String SOURCE_SCHEMA_JDBC_DRIVER_TYPE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type";
- private static final String SOURCE_SCHEMA_JDBC_USERNAME = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username";
- private static final String SOURCE_SCHEMA_JDBC_PASSWORD = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password";
- // example : test_database.test1_table or test1_table
- private static final String SOURCE_SCHEMA_JDBC_DBTABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable";
- // The number of seconds the driver will wait for a Statement object to execute to the given number of seconds.
- // Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout,
- // e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.
- private static final String SOURCE_SCHEMA_JDBC_TIMEOUT = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout";
- // If true, all the columns are nullable.
- private static final String SOURCE_SCHEMA_JDBC_NULLABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable";
- }
-
public JdbcbasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
- options.put("url", props.getString(Config.SOURCE_SCHEMA_JDBC_CONNECTION_URL));
- options.put("driver", props.getString(Config.SOURCE_SCHEMA_JDBC_DRIVER_TYPE));
- options.put("user", props.getString(Config.SOURCE_SCHEMA_JDBC_USERNAME));
- options.put("password", props.getString(Config.SOURCE_SCHEMA_JDBC_PASSWORD));
- options.put("dbtable", props.getString(Config.SOURCE_SCHEMA_JDBC_DBTABLE));
+ options.put("url", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_CONNECTION_URL.key()));
+ options.put("driver", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_DRIVER_TYPE.key()));
+ options.put("user", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_USERNAME.key()));
+ options.put("password", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_PASSWORD.key()));
+ options.put("dbtable", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_DBTABLE.key()));
// the number of seconds the driver will wait for a Statement object to execute to the given
// number of seconds. Zero means there is no limit.
- options.put("queryTimeout", props.getString(Config.SOURCE_SCHEMA_JDBC_TIMEOUT, "0"));
- options.put("nullable", props.getString(Config.SOURCE_SCHEMA_JDBC_NULLABLE, "true"));
+ options.put("queryTimeout", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_TIMEOUT.key(), "0"));
+ options.put("nullable", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_NULLABLE.key(), "true"));
}
@Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index cc0ce2cf35a..575926db0bc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
@@ -36,15 +37,12 @@ import java.util.stream.Collectors;
public class KafkaOffsetPostProcessor extends SchemaPostProcessor {
public static class Config {
- public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
- .key("hoodie.deltastreamer.source.kafka.append.offsets")
- .defaultValue("false")
- .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
- + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
- + "By default its disabled and no kafka offsets are added");
+ @Deprecated
+ public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS =
+ HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS;
public static boolean shouldAddOffsets(TypedProperties props) {
- return props.getBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.parseBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.defaultValue()));
+ return props.getBoolean(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), Boolean.parseBoolean(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.defaultValue()));
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
index f156c15929e..92eca1f6a89 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;
import org.apache.avro.Schema;
@@ -38,31 +39,21 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
/**
* Configs supported.
*/
+ @Deprecated
public static class Config {
- private static final String PROTO_SCHEMA_PROVIDER_PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
- public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".class.name")
- .noDefaultValue()
- .sinceVersion("0.13.0")
- .withDocumentation("The Protobuf Message class used as the source for the schema.");
-
- public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
- .defaultValue(false)
- .sinceVersion("0.13.0")
- .withDocumentation("When set to true wrapped primitives like Int64Value are translated to a record with a single 'value' field. By default, the value is false and the wrapped primitives are "
- + "treated as a nullable value");
-
- public static final ConfigProperty<Boolean> PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".timestamps.as.records")
- .defaultValue(false)
- .sinceVersion("0.13.0")
- .withDocumentation("When set to true Timestamp fields are translated to a record with a seconds and nanos field. By default, the value is false and the timestamp is converted to a long with "
- + "the timestamp-micros logical type");
-
- public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".max.recursion.depth")
- .defaultValue(5)
- .sinceVersion("0.13.0")
- .withDocumentation("The max depth to unravel the Proto schema when translating into an Avro schema. Setting this depth allows the user to convert a schema that is recursive in proto into "
- + "something that can be represented in their lake format like Parquet. After a given class has been seen N times within a single branch, the schema provider will create a record with a "
- + "byte array to hold the remaining proto data and a string to hold the message descriptor's name for context.");
+ // Use {@link ProtoClassBasedSchemaProviderConfig} instead
+ @Deprecated
+ public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME =
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME;
+ @Deprecated
+ public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS =
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;
+ @Deprecated
+ public static final ConfigProperty<Boolean> PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS =
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
+ @Deprecated
+ public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH =
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
}
private final String schemaString;
@@ -75,12 +66,13 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
- Config.PROTO_SCHEMA_CLASS_NAME.key()));
- String className = config.getString(Config.PROTO_SCHEMA_CLASS_NAME.key());
- boolean wrappedPrimitivesAsRecords = props.getBoolean(Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
- Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.defaultValue());
- int maxRecursionDepth = props.getInteger(Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.defaultValue());
- boolean timestampsAsRecords = props.getBoolean(Config.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), false);
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key()));
+ String className = config.getString(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
+ boolean wrappedPrimitivesAsRecords = props.getBoolean(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.defaultValue());
+ int maxRecursionDepth = props.getInteger(
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH.defaultValue());
+ boolean timestampsAsRecords = props.getBoolean(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), false);
ProtoConversionUtil.SchemaConfig schemaConfig = new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords);
try {
schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), schemaConfig).toString();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java
index f0879e055c5..addabe18cbf 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
@@ -31,10 +32,14 @@ import java.io.Serializable;
*/
public abstract class SchemaPostProcessor implements Serializable {
- /** Configs supported. */
+ /**
+ * Configs supported.
+ */
+ @Deprecated
public static class Config {
+ @Deprecated
public static final String SCHEMA_POST_PROCESSOR_PROP =
- "hoodie.deltastreamer.schemaprovider.schema_post_processor";
+ SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key();
}
private static final long serialVersionUID = 1L;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 9ade80ecce4..b26be1b421b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,11 +62,15 @@ public class SchemaRegistryProvider extends SchemaProvider {
* Configs supported.
*/
public static class Config {
-
- public static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+ @Deprecated
+ public static final String SRC_SCHEMA_REGISTRY_URL_PROP =
+ HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key();
+ @Deprecated
public static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
- "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
- public static final String SCHEMA_CONVERTER_PROP = "hoodie.deltastreamer.schemaprovider.registry.schemaconverter";
+ HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key();
+ @Deprecated
+ public static final String SCHEMA_CONVERTER_PROP =
+ HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key();
public static final String SSL_KEYSTORE_LOCATION_PROP = "schema.registry.ssl.keystore.location";
public static final String SSL_TRUSTSTORE_LOCATION_PROP = "schema.registry.ssl.truststore.location";
public static final String SSL_KEYSTORE_PASSWORD_PROP = "schema.registry.ssl.keystore.password";
@@ -86,8 +91,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
public Schema parseSchemaFromRegistry(String registryUrl) throws IOException {
String schema = fetchSchemaFromRegistry(registryUrl);
- SchemaConverter converter = config.containsKey(Config.SCHEMA_CONVERTER_PROP)
- ? ReflectionUtils.loadClass(config.getString(Config.SCHEMA_CONVERTER_PROP))
+ SchemaConverter converter = config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
+ ? ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
: s -> s;
return new Schema.Parser().parse(converter.convert(schema));
}
@@ -142,7 +147,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
- DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+ DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
|| config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
setUpSSLStores();
@@ -174,7 +179,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
@Override
public Schema getSourceSchema() {
- String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+ String registryUrl = config.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key());
try {
return parseSchemaFromRegistry(registryUrl);
} catch (IOException ioe) {
@@ -184,8 +189,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
@Override
public Schema getTargetSchema() {
- String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
- String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+ String registryUrl = config.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key());
+ String targetRegistryUrl = config.getString(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(), registryUrl);
try {
return parseSchemaFromRegistry(targetRegistryUrl);
} catch (IOException ioe) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
index ddaf94c93a0..406f71872d5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
@@ -30,9 +31,11 @@ import org.apache.spark.sql.types.StructType;
*/
public class SparkAvroPostProcessor extends SchemaPostProcessor {
+ @Deprecated
public static class Config {
+ @Deprecated
public static final String SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE =
- "hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable";
+ HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key();
}
public SparkAvroPostProcessor(TypedProperties props, JavaSparkContext jssc) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
index 05b446f8451..dc601367fb7 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
@@ -20,10 +20,11 @@ package org.apache.hudi.utilities.schema.postprocessor;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.avro.Schema;
-import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -51,18 +52,21 @@ public class DropColumnSchemaPostProcessor extends SchemaPostProcessor {
super(props, jssc);
}
+ @Deprecated
public static class Config {
+ @Deprecated
public static final String DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP =
- "hoodie.deltastreamer.schemaprovider.schema_post_processor.delete.columns";
+ SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key();
}
@Override
public Schema processSchema(Schema schema) {
- String columnToDeleteStr = this.config.getString(Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP);
+ String columnToDeleteStr = this.config.getString(SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key());
if (StringUtils.isNullOrEmpty(columnToDeleteStr)) {
- LOG.warn(String.format("Param %s is null or empty, return original schema", Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP));
+ LOG.warn(String.format("Param %s is null or empty, return original schema",
+ SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key()));
}
// convert field to lowerCase for compare purpose
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
index f7d31b5cf47..465d227dedb 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.schema.postprocessor.add;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
@@ -47,7 +48,7 @@ public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor {
@Override
public Schema processSchema(Schema schema) {
- String newColumnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+ String newColumnName = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
if (schema.getField(newColumnName) != null) {
throw new HoodieSchemaPostProcessException(String.format("Column %s already exist!", newColumnName));
@@ -69,13 +70,13 @@ public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor {
private Schema.Field buildNewColumn() {
- String columnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
- String type = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
- String doc = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
- Object defaultValue = this.config.getOrDefault(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
+ String columnName = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+ String type = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
+ String doc = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
+ Object defaultValue = this.config.getOrDefault(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
null);
- boolean nullable = this.config.getBoolean(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(),
- BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.defaultValue());
+ boolean nullable = this.config.getBoolean(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(),
+ SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.defaultValue());
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(columnName));
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(type));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java
deleted file mode 100644
index d1528c362c8..00000000000
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.schema.postprocessor.add;
-
-import org.apache.hudi.common.config.ConfigProperty;
-
-/**
- * Base configs to describe a primitive type column.
- */
-public class BaseSchemaPostProcessorConfig {
-
- public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
- .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.name")
- .noDefaultValue()
- .withDocumentation("New column's name");
-
- public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
- .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.type")
- .noDefaultValue()
- .withDocumentation("New column's type");
-
- public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
- .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.nullable")
- .defaultValue(true)
- .withDocumentation("New column's nullable");
-
- public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
- .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.default")
- .noDefaultValue()
- .withDocumentation("New column's default value");
-
- public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
- .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.doc")
- .noDefaultValue()
- .withDocumentation("Docs about new column");
-
-}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 9241582b720..1f6653e9655 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -22,8 +22,8 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
-import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
@@ -53,10 +53,10 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, metrics);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
- ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key()));
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key()));
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class);
- className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key());
+ className = props.getString(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
this.offsetGen = new KafkaOffsetGen(props);
if (this.shouldAddOffsets) {
throw new HoodieException("Appending kafka offsets to ProtoKafkaSource is not supported");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 7dbb26890e7..5aec478875c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -118,7 +119,7 @@ public abstract class DebeziumSource extends RowSource {
return Pair.of(Option.of(sparkSession.emptyDataFrame()), overrideCheckpointStr.isEmpty() ? CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr);
} else {
try {
- String schemaStr = schemaRegistryProvider.fetchSchemaFromRegistry(props.getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+ String schemaStr = schemaRegistryProvider.fetchSchemaFromRegistry(props.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
Dataset<Row> dataset = toDataset(offsetRanges, offsetGen, schemaStr);
LOG.info(String.format("Spark schema of Kafka Payload for topic %s:\n%s", offsetGen.getTopicName(), dataset.schema().treeString()));
LOG.info(String.format("New checkpoint string: %s", CheckpointUtils.offsetsToStr(offsetRanges)));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index eeb08a7fd75..a965dbb0ebb 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -38,8 +38,8 @@ import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
/**
* Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy fields to circumvent issues around
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
index 9eeee8f5c92..ffcc3949ade 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -48,30 +49,25 @@ public class SanitizationUtils {
private static final ObjectMapper OM = new ObjectMapper();
+ @Deprecated
public static class Config {
+ @Deprecated
+ public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES =
+ HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
- public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
- .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
- .defaultValue(false)
- .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
- + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
- + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
-
- public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
- .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
- .defaultValue("__")
- .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
- + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+ @Deprecated
+ public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK =
+ HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
}
private static final String AVRO_FIELD_NAME_KEY = "name";
public static boolean getShouldSanitize(TypedProperties props) {
- return props.getBoolean(Config.SANITIZE_SCHEMA_FIELD_NAMES.key(),Config.SANITIZE_SCHEMA_FIELD_NAMES.defaultValue());
+ return props.getBoolean(HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.defaultValue());
}
public static String getInvalidCharMask(TypedProperties props) {
- return props.getString(Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
+ return props.getString(HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
}
private static DataType sanitizeDataTypeForAvro(DataType dataType, String invalidCharMask) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index a0dc49aa670..0818c43cc02 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -19,15 +19,14 @@
package org.apache.hudi.utilities;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
-import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
-import org.apache.hudi.utilities.schema.postprocessor.add.BaseSchemaPostProcessorConfig;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.transform.FlatteningTransformer;
@@ -75,7 +74,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
@Test
public void testPostProcessor() throws IOException {
- properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), DummySchemaPostProcessor.class.getName());
SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
@@ -89,7 +88,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
@Test
public void testSparkAvro() throws IOException {
- properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, SparkAvroPostProcessor.class.getName());
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), SparkAvroPostProcessor.class.getName());
List<String> transformerClassNames = new ArrayList<>();
transformerClassNames.add(FlatteningTransformer.class.getName());
@@ -115,10 +114,10 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
@Test
public void testChainedSchemaPostProcessor() {
// DeleteSupportSchemaPostProcessor first, DummySchemaPostProcessor second
- properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(),
"org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor,org.apache.hudi.utilities.DummySchemaPostProcessor");
- SchemaPostProcessor processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
+ SchemaPostProcessor processor = UtilHelpers.createSchemaPostProcessor(properties.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key()), properties, jsc);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Schema targetSchema = processor.processSchema(schema);
@@ -127,10 +126,10 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
assertNotNull(targetSchema.getField("testString"));
// DummySchemaPostProcessor first, DeleteSupportSchemaPostProcessor second
- properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(),
"org.apache.hudi.utilities.DummySchemaPostProcessor,org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor");
- processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
+ processor = UtilHelpers.createSchemaPostProcessor(properties.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key()), properties, jsc);
schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
targetSchema = processor.processSchema(schema);
@@ -142,7 +141,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
@Test
public void testDeleteColumn() {
// remove column ums_id_ from source schema
- properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "rider");
+ properties.put(SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key(), "rider");
DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Schema targetSchema = processor.processSchema(schema);
@@ -154,7 +153,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
@Test
public void testDeleteColumnThrows() {
// remove all columns from source schema
- properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "timestamp,_row_key,rider,driver,fare");
+ properties.put(SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key(), "timestamp,_row_key,rider,driver,fare");
DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
@@ -164,9 +163,9 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
@ParameterizedTest
@MethodSource("configParams")
public void testAddPrimitiveTypeColumn(String type) {
- properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
- properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
- properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
AddPrimitiveColumnSchemaPostProcessor processor = new AddPrimitiveColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
@@ -180,7 +179,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
assertNotEquals(type, newColumn.schema().getType().getName());
// test not nullable
- properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(), false);
+ properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(), false);
targetSchema = processor.processSchema(schema);
newColumn = targetSchema.getField("primitive_column");
assertEquals(type, newColumn.schema().getType().getName());
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 1a3e79e150a..dbc87cbbda2 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -68,10 +68,10 @@ import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
@@ -143,7 +143,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.config.HoodieWriteConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE;
import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
@@ -151,6 +150,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
@@ -766,7 +766,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
- cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
+ cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
}
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
@@ -780,7 +780,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
- cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
+ cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
}
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
@@ -806,7 +806,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
}
if (!useSchemaPostProcessor) {
- cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
+ cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
}
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
new HoodieDeltaStreamer(cfg, jsc).sync();
@@ -1865,7 +1865,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
if (extraProps != null && !extraProps.isEmpty()) {
extraProps.forEach(props::setProperty);
}
- props.setProperty(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(shouldAddOffsets));
+ props.setProperty(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(shouldAddOffsets));
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + propsFileName);
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index a2b480ac1ba..0e561eee5fb 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -23,6 +23,8 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
@@ -120,13 +122,13 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
assertEquals(2, streamer.getTableExecutionContexts().size());
assertEquals(basePath + "/multi_table_dataset/uber_db/dummy_table_uber", executionContext.getConfig().targetBasePath);
assertEquals("uber_db.dummy_table_uber", executionContext.getConfig().targetTableName);
- assertEquals("topic1", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP));
+ assertEquals("topic1", executionContext.getProperties().getString(HoodieDeltaStreamerConfig.KAFKA_TOPIC.key()));
assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP));
- assertEquals("http://localhost:8081/subjects/random-value/versions/latest", executionContext.getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+ assertEquals("http://localhost:8081/subjects/random-value/versions/latest", executionContext.getProperties().getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
assertEquals("http://localhost:8081/subjects/topic2-value/versions/latest",
- streamer.getTableExecutionContexts().get(0).getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+ streamer.getTableExecutionContexts().get(0).getProperties().getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
}
@Test
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index caa54c01ebf..24b4839735c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -21,10 +21,10 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
-import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
import org.apache.avro.Schema;
@@ -93,8 +93,8 @@ public class TestSourceFormatAdapter {
private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd, StructType unsanitizedSchema) {
TypedProperties typedProperties = new TypedProperties();
- typedProperties.put(SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
- typedProperties.put(SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
+ typedProperties.put(HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
+ typedProperties.put(HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
setupRowSource(spark.read().schema(unsanitizedSchema).json(rdd));
SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(testRowDataSource, Option.empty(), Option.of(typedProperties));
return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
@@ -102,8 +102,8 @@ public class TestSourceFormatAdapter {
private InputBatch<Dataset<Row>> fetchJsonData(JavaRDD<String> rdd, StructType sanitizedSchema) {
TypedProperties typedProperties = new TypedProperties();
- typedProperties.put(SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
- typedProperties.put(SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
+ typedProperties.put(HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
+ typedProperties.put(HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
setupJsonSource(rdd, SchemaConverters.toAvroType(sanitizedSchema, false, "record", ""));
SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(testJsonDataSource, Option.empty(), Option.of(typedProperties));
return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
index fe6f0ff9f2f..59e82c27edf 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
@@ -30,8 +30,8 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateProperFormattedSchema;
import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithConfiguredReplacement;
import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithDefaultReplacement;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java
index 5a9efbba915..2bb40807028 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java
@@ -20,6 +20,7 @@
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.test.proto.Parent;
import org.apache.hudi.utilities.test.proto.Sample;
import org.apache.hudi.utilities.test.proto.WithOneOf;
@@ -35,7 +36,7 @@ public class TestProtoClassBasedSchemaProvider {
@Test
public void validateDefaultSchemaGeneration() throws IOException {
TypedProperties properties = new TypedProperties();
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
Schema.Parser parser = new Schema.Parser();
@@ -46,9 +47,9 @@ public class TestProtoClassBasedSchemaProvider {
@Test
public void validateWrappedPrimitiveAndTimestampsAsRecordSchemaGeneration() throws IOException {
TypedProperties properties = new TypedProperties();
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), "true");
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), "true");
ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
Schema.Parser parser = new Schema.Parser();
@@ -59,8 +60,8 @@ public class TestProtoClassBasedSchemaProvider {
@Test
public void validateRecursiveSchemaGeneration_depth2() throws IOException {
TypedProperties properties = new TypedProperties();
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), String.valueOf(2));
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), String.valueOf(2));
ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
Schema.Parser parser = new Schema.Parser();
@@ -71,7 +72,7 @@ public class TestProtoClassBasedSchemaProvider {
@Test
public void validateRecursiveSchemaGeneration_defaultDepth() throws IOException {
TypedProperties properties = new TypedProperties();
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
Schema.Parser parser = new Schema.Parser();
@@ -82,7 +83,7 @@ public class TestProtoClassBasedSchemaProvider {
@Test
public void validateOneOfSchemaGeneration() throws IOException {
TypedProperties properties = new TypedProperties();
- properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), WithOneOf.class.getName());
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), WithOneOf.class.getName());
ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
Schema protoSchema = protoToAvroSchemaProvider.getSourceSchema();
Schema.Parser parser = new Schema.Parser();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 020526b159d..3448f11eec5 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -24,10 +24,10 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -140,7 +140,7 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
GenericRecord withoutKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
- props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+ props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
@@ -168,7 +168,7 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
.getBatch().get();
List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
- props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+ props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index f05107f2c25..3609cec419d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -25,12 +25,12 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.deltastreamer.ErrorEvent;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -320,7 +320,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
assertEquals(numMessages, c.count());
List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
- props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+ props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
kafkaSource = new SourceFormatAdapter(jsonSource);
Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index bf761509abf..ddf1e1b784d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -80,7 +81,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource {
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
- props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
+ props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
return props;
}
@@ -98,7 +99,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource {
final String topic = TEST_TOPIC_PREFIX + "testProtoKafkaSourceFlatten";
testUtils.createTopic(topic, 2);
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
- props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
+ props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
SchemaProvider schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc());
Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(protoKafkaSource);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
index d54b0b54e67..2349fad1d76 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
@@ -33,7 +33,7 @@ import org.junit.jupiter.params.provider.Arguments;
import java.util.stream.Stream;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
public class SanitizationTestUtils {
public static String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();