You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/16 01:06:50 UTC
[hudi] branch master updated: [HUDI-4785] Fix partition discovery in bootstrap operation (#6673)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 488f58d770 [HUDI-4785] Fix partition discovery in bootstrap operation (#6673)
488f58d770 is described below
commit 488f58d770137057532196065f2f69eea1a15db8
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Fri Sep 16 06:36:44 2022 +0530
[HUDI-4785] Fix partition discovery in bootstrap operation (#6673)
Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
.../apache/hudi/config/HoodieBootstrapConfig.java | 20 +++++----
.../SparkBootstrapCommitActionExecutor.java | 47 ++++++++++++++++------
.../SparkBootstrapDeltaCommitActionExecutor.java | 12 ++++--
.../hudi/common/table/TableSchemaResolver.java | 34 ++++++++--------
.../hudi/common/table/TestTableSchemaResolver.java | 12 +++---
.../org/apache/hudi/HoodieBootstrapRelation.scala | 7 ++--
.../SparkFullBootstrapDataProviderBase.java | 4 +-
.../functional/TestDataSourceForBootstrap.scala | 18 +++++++--
8 files changed, 99 insertions(+), 55 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index 94bb7830cc..0b9116b01c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -34,6 +34,9 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;
+
/**
* Bootstrap specific configs.
*/
@@ -50,6 +53,15 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table");
+ public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE = ConfigProperty
+ .key("hoodie.bootstrap.mode.selector.regex.mode")
+ .defaultValue(METADATA_ONLY.name())
+ .sinceVersion("0.6.0")
+ .withValidValues(METADATA_ONLY.name(), FULL_RECORD.name())
+ .withDocumentation("Bootstrap mode to apply for partition paths, that match regex above. "
+ + "METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. "
+ + "FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table.");
+
public static final ConfigProperty<String> MODE_SELECTOR_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.mode.selector")
.defaultValue(MetadataOnlyBootstrapModeSelector.class.getCanonicalName())
@@ -92,14 +104,6 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Matches each bootstrap dataset partition against this regex and applies the mode below to it.");
- public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE = ConfigProperty
- .key("hoodie.bootstrap.mode.selector.regex.mode")
- .defaultValue(BootstrapMode.METADATA_ONLY.name())
- .sinceVersion("0.6.0")
- .withDocumentation("Bootstrap mode to apply for partition paths, that match regex above. "
- + "METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. "
- + "FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table.");
-
public static final ConfigProperty<String> INDEX_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.index.class")
.defaultValue(HFileBootstrapIndex.class.getName())
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index a2ee384940..88f6a54e0d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -47,7 +48,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
@@ -74,11 +74,15 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
import static org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory.getMetadataHandler;
@@ -93,19 +97,29 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
HoodieWriteConfig config,
HoodieTable table,
Option<Map<String, String>> extraMetadata) {
- super(context, new HoodieWriteConfig.Builder().withProps(config.getProps())
- .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
- .withBulkInsertParallelism(config.getBootstrapParallelism())
- .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, WriteOperationType.BOOTSTRAP,
+ super(
+ context,
+ new HoodieWriteConfig.Builder()
+ .withProps(config.getProps())
+ .withAutoCommit(true)
+ .withWriteStatusClass(BootstrapWriteStatus.class)
+ .withBulkInsertParallelism(config.getBootstrapParallelism()).build(),
+ table,
+ HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+ WriteOperationType.BOOTSTRAP,
extraMetadata);
bootstrapSourceFileSystem = FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
}
private void validate() {
- ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null,
+ checkArgument(config.getBootstrapSourceBasePath() != null,
"Ensure Bootstrap Source Path is set");
- ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null,
+ checkArgument(config.getBootstrapModeSelectorClass() != null,
"Ensure Bootstrap Partition Selector is set");
+ if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
+ checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
+ "FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY bootstrap mode");
+ }
}
@Override
@@ -115,15 +129,15 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
HoodieTableMetaClient metaClient = table.getMetaClient();
Option<HoodieInstant> completedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
- ValidationUtils.checkArgument(!completedInstant.isPresent(),
+ checkArgument(!completedInstant.isPresent(),
"Active Timeline is expected to be empty for bootstrap to be performed. "
+ "If you want to re-bootstrap, please rollback bootstrap first !!");
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
// First run metadata bootstrap which will auto commit
- Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
+ Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
- Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
+ Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(FULL_RECORD));
return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
} catch (IOException ioe) {
@@ -307,12 +321,21 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
BootstrapModeSelector selector =
(BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
- Map<BootstrapMode, List<String>> result = selector.select(folders);
+ Map<BootstrapMode, List<String>> result = new HashMap<>();
+ // for FULL_RECORD mode, original record along with metadata fields are needed
+ if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
+ if (!(selector instanceof FullRecordBootstrapModeSelector)) {
+ FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new FullRecordBootstrapModeSelector(config);
+ result.putAll(fullRecordBootstrapModeSelector.select(folders));
+ }
+ } else {
+ result = selector.select(folders);
+ }
Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));
// Ensure all partitions are accounted for
- ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
+ checkArgument(partitionToFiles.keySet().equals(
result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
index d712ca430b..0d2ac6ceef 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
@@ -42,8 +42,14 @@ public class SparkBootstrapDeltaCommitActionExecutor<T extends HoodieRecordPaylo
@Override
protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
- return new SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
- .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
- inputRecordsRDD, extraMetadata);
+ return new SparkBulkInsertDeltaCommitActionExecutor(
+ (HoodieSparkEngineContext) context,
+ new HoodieWriteConfig.Builder()
+ .withProps(config.getProps())
+ .withSchema(bootstrapSchema).build(),
+ table,
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ inputRecordsRDD,
+ extraMetadata);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index f825fd6b99..657ac57c63 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -18,17 +18,6 @@
package org.apache.hudi.common.table;
-import org.apache.avro.JsonProperties;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.IndexedRecord;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -56,9 +45,17 @@ import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.hudi.util.Lazy;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -66,6 +63,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import javax.annotation.concurrent.ThreadSafe;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -208,7 +206,7 @@ public class TableSchemaResolver {
// TODO partition columns have to be appended in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
return metaClient.getTableConfig().getPartitionFields()
- .map(partitionFields -> appendPartitionColumns(schema, partitionFields))
+ .map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
.orElse(schema);
}
@@ -650,18 +648,18 @@ public class TableSchemaResolver {
}
}
- static Schema appendPartitionColumns(Schema dataSchema, String[] partitionFields) {
+ public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) {
// In cases when {@link DROP_PARTITION_COLUMNS} config is set true, partition columns
// won't be persisted w/in the data files, and therefore we need to append such columns
// when schema is parsed from data files
//
// Here we append partition columns with {@code StringType} as the data type
- if (partitionFields.length == 0) {
+ if (!partitionFields.isPresent() || partitionFields.get().length == 0) {
return dataSchema;
}
- boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
- boolean hasPartitionColInSchema = Arrays.stream(partitionFields).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
+ boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
+ boolean hasPartitionColInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
throw new HoodieIncompatibleSchemaException("Partition columns could not be partially contained w/in the data schema");
}
@@ -670,7 +668,7 @@ public class TableSchemaResolver {
// when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns
// are not in originSchema. So we create and add them.
List<Field> newFields = new ArrayList<>();
- for (String partitionField: partitionFields) {
+ for (String partitionField: partitionFields.get()) {
newFields.add(new Schema.Field(
partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 51d5c5212f..5d949431e4 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -18,10 +18,12 @@
package org.apache.hudi.common.table;
-import org.apache.avro.Schema;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
+
+import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -36,17 +38,17 @@ public class TestTableSchemaResolver {
// case2
String[] pts1 = new String[0];
- Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, pts1);
+ Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts1));
assertEquals(originSchema, s2);
// case3: partition_path is in originSchema
String[] pts2 = {"partition_path"};
- Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, pts2);
+ Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts2));
assertEquals(originSchema, s3);
// case4: user_partition is not in originSchema
String[] pts3 = {"user_partition"};
- Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
+ Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts3));
assertNotEquals(originSchema, s4);
assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition")));
Schema.Field f = s4.getField("user_partition");
@@ -55,7 +57,7 @@ public class TestTableSchemaResolver {
// case5: user_partition is in originSchema, but partition_path is in originSchema
String[] pts4 = {"user_partition", "partition_path"};
try {
- TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
+ TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts3));
} catch (HoodieIncompatibleSchemaException e) {
assertTrue(e.getMessage().contains("Partial partition fields are still in the schema"));
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 00de2f756e..4ec7f65913 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -20,18 +20,17 @@ package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieBaseFile
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SQLContext}
import scala.collection.JavaConverters._
@@ -147,7 +146,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
if (fullSchema == null) {
logInfo("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
- val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+ val tableSchema = TableSchemaResolver.appendPartitionColumns(schemaResolver.getTableAvroSchemaWithoutMetadataFields, metaClient.getTableConfig.getPartitionFields)
dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index a0204b256e..bc732a1401 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -60,7 +60,9 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);
- Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);
+ // NOTE: "basePath" option is required for spark to discover the partition column
+ // More details at https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
+ Dataset inputDataset = sparkSession.read().format(getFormat()).option("basePath", sourceBasePath).load(filePaths);
try {
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String structName = tableName + "_record";
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 23b21b315f..a2b9d29009 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import java.time.Instant
import java.util.Collections
@@ -148,8 +150,9 @@ class TestDataSourceForBootstrap {
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false)
}
- @Test
- def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("METADATA_ONLY", "FULL_RECORD"))
+ def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
@@ -166,7 +169,10 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
- commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
+ commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++
+ Map(
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+ HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key -> bootstrapMode),
classOf[SimpleKeyGenerator].getName)
// check marked directory clean up
@@ -520,7 +526,11 @@ class TestDataSourceForBootstrap {
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
- assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+ val expectedBootstrapInstant =
+ if ("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key, HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue)))
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
+ else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS
+ assertEquals(expectedBootstrapInstant, commitInstantTime1)
commitInstantTime1
}