You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/10/17 12:10:54 UTC
[hudi] branch master updated: [HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (#6694)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fee80b9685 [HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (#6694)
fee80b9685 is described below
commit fee80b9685c4203104ba12875bff0d6250f74057
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Mon Oct 17 05:10:43 2022 -0700
[HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (#6694)
---
.../demo/sparksql-bootstrap-prep-source.commands | 2 +
.../org/apache/hudi/integ/ITTestHoodieDemo.java | 52 +++++++++++-----------
.../utilities/deltastreamer/BootstrapExecutor.java | 47 +++++++++++++++++--
.../functional/TestHoodieDeltaStreamer.java | 9 ++--
4 files changed, 78 insertions(+), 32 deletions(-)
diff --git a/docker/demo/sparksql-bootstrap-prep-source.commands b/docker/demo/sparksql-bootstrap-prep-source.commands
index 23db3e4d38..ca54b4bab3 100644
--- a/docker/demo/sparksql-bootstrap-prep-source.commands
+++ b/docker/demo/sparksql-bootstrap-prep-source.commands
@@ -18,5 +18,7 @@
import org.apache.spark.sql.functions.col
val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
+// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
+// in the value. Currently it fails the tests due to slash encoding.
df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
System.exit(0)
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 32b4122abe..f75fe175fb 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -231,33 +231,33 @@ public class ITTestHoodieDemo extends ITTestBase {
executeSparkSQLCommand(SPARKSQL_BS_PREP_COMMANDS, true);
List<String> bootstrapCmds = CollectionUtils.createImmutableList(
"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
- + " --table-type COPY_ON_WRITE "
- + " --run-bootstrap "
- + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
- + " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
- + " --props /var/demo/config/dfs-source.properties"
- + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
- + " --initial-checkpoint-provider"
- + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
- + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
- + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
- + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
- + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
- + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
+ + " --table-type COPY_ON_WRITE "
+ + " --run-bootstrap "
+ + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ + " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
+ + " --props /var/demo/config/dfs-source.properties"
+ + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ + " --initial-checkpoint-provider"
+ + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+ + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+ + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+ + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+ + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+ + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
- + " --table-type MERGE_ON_READ "
- + " --run-bootstrap "
- + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
- + " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
- + " --props /var/demo/config/dfs-source.properties"
- + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
- + " --initial-checkpoint-provider"
- + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
- + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
- + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
- + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
- + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
- + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
+ + " --table-type MERGE_ON_READ "
+ + " --run-bootstrap "
+ + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ + " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
+ + " --props /var/demo/config/dfs-source.properties"
+ + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ + " --initial-checkpoint-provider"
+ + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+ + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+ + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+ + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+ + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+ + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
executeCommandStringsInDocker(ADHOC_1_CONTAINER, bootstrapCmds);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
index 828dd6af5f..bf16c14aab 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
@@ -22,9 +22,11 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -33,6 +35,9 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -48,8 +53,16 @@ import java.io.Serializable;
import java.util.HashMap;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT;
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_TIMEZONE;
+import static org.apache.hudi.config.HoodieBootstrapConfig.KEYGEN_CLASS_NAME;
+import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
+import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
+import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
@@ -187,16 +200,44 @@ public class BootstrapExecutor implements Serializable {
+ ". Cannot bootstrap data on top of an existing table");
}
}
- HoodieTableMetaClient.withPropertyBuilder()
+
+ HoodieTableMetaClient.PropertyBuilder builder = HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(props)
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
- .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+ .setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
+ .setPreCombineField(props.getString(
+ PRECOMBINE_FIELD_NAME.key(), PRECOMBINE_FIELD_NAME.defaultValue()))
+ .setPopulateMetaFields(props.getBoolean(
+ POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
+ .setArchiveLogFolder(props.getString(
+ ARCHIVELOG_FOLDER.key(), ARCHIVELOG_FOLDER.defaultValue()))
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setBootstrapIndexClass(cfg.bootstrapIndexClass)
.setBootstrapBasePath(bootstrapBasePath)
- .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
+ .setHiveStylePartitioningEnable(props.getBoolean(
+ HIVE_STYLE_PARTITIONING_ENABLE.key(),
+ Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())
+ ))
+ .setUrlEncodePartitioning(props.getBoolean(
+ URL_ENCODE_PARTITIONING.key(),
+ Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
+ .setCommitTimezone(HoodieTimelineTimeZone.valueOf(
+ props.getString(
+ TIMELINE_TIMEZONE.key(),
+ String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
+ .setPartitionMetafileUseBaseFormat(props.getBoolean(
+ PARTITION_METAFILE_USE_BASE_FORMAT.key(),
+ PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
+ String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+ if (!StringUtils.isNullOrEmpty(partitionColumns)) {
+ builder.setPartitionFields(partitionColumns).setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()));
+ } else {
+ builder.setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()));
+ }
+
+ builder.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
}
public HoodieWriteConfig getBootstrapConfig() {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index f05a36745b..ec163aec33 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -46,9 +46,9 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -630,11 +630,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
Dataset<Row> sourceDf = sqlContext.read()
.format("org.apache.hudi")
.load(tableBasePath);
- sourceDf.write().format("parquet").save(bootstrapSourcePath);
+ // TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
+ // in the value. Currently it fails the tests due to slash encoding.
+ sourceDf.write().format("parquet").partitionBy("rider").save(bootstrapSourcePath);
String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
cfg.runBootstrap = true;
cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
+ cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
cfg.configs.add("hoodie.bootstrap.parallelism=5");
cfg.targetBasePath = newDatasetBasePath;