You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/10/17 12:10:54 UTC

[hudi] branch master updated: [HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (#6694)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fee80b9685 [HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (#6694)
fee80b9685 is described below

commit fee80b9685c4203104ba12875bff0d6250f74057
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Mon Oct 17 05:10:43 2022 -0700

    [HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (#6694)
---
 .../demo/sparksql-bootstrap-prep-source.commands   |  2 +
 .../org/apache/hudi/integ/ITTestHoodieDemo.java    | 52 +++++++++++-----------
 .../utilities/deltastreamer/BootstrapExecutor.java | 47 +++++++++++++++++--
 .../functional/TestHoodieDeltaStreamer.java        |  9 ++--
 4 files changed, 78 insertions(+), 32 deletions(-)

diff --git a/docker/demo/sparksql-bootstrap-prep-source.commands b/docker/demo/sparksql-bootstrap-prep-source.commands
index 23db3e4d38..ca54b4bab3 100644
--- a/docker/demo/sparksql-bootstrap-prep-source.commands
+++ b/docker/demo/sparksql-bootstrap-prep-source.commands
@@ -18,5 +18,7 @@
 import org.apache.spark.sql.functions.col
 
 val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
+// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
+//  in the value.  Currently it fails the tests due to slash encoding.
 df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
 System.exit(0)
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 32b4122abe..f75fe175fb 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -231,33 +231,33 @@ public class ITTestHoodieDemo extends ITTestBase {
     executeSparkSQLCommand(SPARKSQL_BS_PREP_COMMANDS, true);
     List<String> bootstrapCmds = CollectionUtils.createImmutableList(
         "spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
-        + " --table-type COPY_ON_WRITE "
-        + " --run-bootstrap "
-        + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
-        + " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
-        + " --props /var/demo/config/dfs-source.properties"
-        + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
-        + " --initial-checkpoint-provider"
-        + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
-        + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
-        + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
-        + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
-        + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
-        + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
+            + " --table-type COPY_ON_WRITE "
+            + " --run-bootstrap "
+            + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+            + " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
+            + " --props /var/demo/config/dfs-source.properties"
+            + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+            + " --initial-checkpoint-provider"
+            + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+            + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+            + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+            + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+            + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+            + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
         "spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
-        + " --table-type MERGE_ON_READ "
-        + " --run-bootstrap "
-        + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
-        + " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
-        + " --props /var/demo/config/dfs-source.properties"
-        + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
-        + " --initial-checkpoint-provider"
-        + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
-        + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
-        + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
-        + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
-        + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
-        + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
+            + " --table-type MERGE_ON_READ "
+            + " --run-bootstrap "
+            + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+            + " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
+            + " --props /var/demo/config/dfs-source.properties"
+            + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+            + " --initial-checkpoint-provider"
+            + " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+            + " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+            + " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+            + " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+            + " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+            + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
     executeCommandStringsInDocker(ADHOC_1_CONTAINER, bootstrapCmds);
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
index 828dd6af5f..bf16c14aab 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
@@ -22,9 +22,11 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -33,6 +35,9 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.util.SparkKeyGenUtils;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
@@ -48,8 +53,16 @@ import java.io.Serializable;
 import java.util.HashMap;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT;
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_TIMEZONE;
+import static org.apache.hudi.config.HoodieBootstrapConfig.KEYGEN_CLASS_NAME;
+import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
+import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
+import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 
@@ -187,16 +200,44 @@ public class BootstrapExecutor implements Serializable {
             + ". Cannot bootstrap data on top of an existing table");
       }
     }
-    HoodieTableMetaClient.withPropertyBuilder()
+
+    HoodieTableMetaClient.PropertyBuilder builder = HoodieTableMetaClient.withPropertyBuilder()
         .fromProperties(props)
         .setTableType(cfg.tableType)
         .setTableName(cfg.targetTableName)
-        .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+        .setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
+        .setPreCombineField(props.getString(
+            PRECOMBINE_FIELD_NAME.key(), PRECOMBINE_FIELD_NAME.defaultValue()))
+        .setPopulateMetaFields(props.getBoolean(
+            POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
+        .setArchiveLogFolder(props.getString(
+            ARCHIVELOG_FOLDER.key(), ARCHIVELOG_FOLDER.defaultValue()))
         .setPayloadClassName(cfg.payloadClassName)
         .setBaseFileFormat(cfg.baseFileFormat)
         .setBootstrapIndexClass(cfg.bootstrapIndexClass)
         .setBootstrapBasePath(bootstrapBasePath)
-        .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
+        .setHiveStylePartitioningEnable(props.getBoolean(
+            HIVE_STYLE_PARTITIONING_ENABLE.key(),
+            Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())
+        ))
+        .setUrlEncodePartitioning(props.getBoolean(
+            URL_ENCODE_PARTITIONING.key(),
+            Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
+        .setCommitTimezone(HoodieTimelineTimeZone.valueOf(
+            props.getString(
+                TIMELINE_TIMEZONE.key(),
+                String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
+        .setPartitionMetafileUseBaseFormat(props.getBoolean(
+            PARTITION_METAFILE_USE_BASE_FORMAT.key(),
+            PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
+    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+    if (!StringUtils.isNullOrEmpty(partitionColumns)) {
+      builder.setPartitionFields(partitionColumns).setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()));
+    } else {
+      builder.setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()));
+    }
+
+    builder.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
   }
 
   public HoodieWriteConfig getBootstrapConfig() {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index f05a36745b..ec163aec33 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -46,9 +46,9 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -630,11 +630,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     Dataset<Row> sourceDf = sqlContext.read()
         .format("org.apache.hudi")
         .load(tableBasePath);
-    sourceDf.write().format("parquet").save(bootstrapSourcePath);
+    // TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
+    //  in the value.  Currently it fails the tests due to slash encoding.
+    sourceDf.write().format("parquet").partitionBy("rider").save(bootstrapSourcePath);
 
     String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
     cfg.runBootstrap = true;
     cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
+    cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
     cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
     cfg.configs.add("hoodie.bootstrap.parallelism=5");
     cfg.targetBasePath = newDatasetBasePath;