You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 10:30:53 UTC
[hudi] 02/13: [HUDI-3935] Adding config to fallback to enabled Partition Values extraction from Partition path (#5377)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6fccca6c04724a304c117b77a8cb42b0e433a719
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Apr 21 01:36:19 2022 -0700
[HUDI-3935] Adding config to fallback to enabled Partition Values extraction from Partition path (#5377)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../hudi/common/table/HoodieTableConfig.java | 2 +-
.../hudi/common/table/HoodieTableMetaClient.java | 26 +++++--------
.../internal/DataSourceInternalWriterHelper.java | 2 +-
.../org/apache/hudi/BaseFileOnlyRelation.scala | 22 +++++------
.../scala/org/apache/hudi/DataSourceOptions.scala | 40 +++++++++++++++++---
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 44 +++++++++++++++++-----
.../org/apache/hudi/HoodieDataSourceHelper.scala | 1 -
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../parquet/HoodieParquetFileFormat.scala | 10 +++--
.../apache/hudi/functional/TestCOWDataSource.scala | 39 +++++++++++++++----
.../hudi/utilities/deltastreamer/DeltaSync.java | 4 +-
12 files changed, 133 insertions(+), 61 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c1ebef7bb6..4b747d3a77 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1465,7 +1465,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
// Validate table properties
- metaClient.validateTableProperties(config.getProps(), operationType);
+ metaClient.validateTableProperties(config.getProps());
// Make sure that FS View is in sync
table.getHoodieView().sync();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index bbc508bd5f..edc6caa5bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -193,7 +193,7 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> PARTITION_METAFILE_USE_BASE_FORMAT = ConfigProperty
.key("hoodie.partition.metafile.use.base.format")
.defaultValue(false)
- .withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). "
+ .withDocumentation("If true, partition metafiles are saved in the same format as base-files for this dataset (e.g. Parquet / ORC). "
+ "If false (default) partition metafiles are saved as properties files.");
public static final ConfigProperty<Boolean> DROP_PARTITION_COLUMNS = ConfigProperty
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index a65be689a4..251a990d87 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -377,16 +376,15 @@ public class HoodieTableMetaClient implements Serializable {
/**
* Validate table properties.
* @param properties Properties from writeConfig.
- * @param operationType operation type to be executed.
*/
- public void validateTableProperties(Properties properties, WriteOperationType operationType) {
- // once meta fields are disabled, it cant be re-enabled for a given table.
+ public void validateTableProperties(Properties properties) {
+ // Once meta fields are disabled, it cant be re-enabled for a given table.
if (!getTableConfig().populateMetaFields()
&& Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) {
throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
}
- // meta fields can be disabled only with SimpleKeyGenerator
+ // Meta fields can be disabled only when {@code SimpleKeyGenerator} is used
if (!getTableConfig().populateMetaFields()
&& !properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator")
.equals("org.apache.hudi.keygen.SimpleKeyGenerator")) {
@@ -698,7 +696,7 @@ public class HoodieTableMetaClient implements Serializable {
private Boolean urlEncodePartitioning;
private HoodieTimelineTimeZone commitTimeZone;
private Boolean partitionMetafileUseBaseFormat;
- private Boolean dropPartitionColumnsWhenWrite;
+ private Boolean shouldDropPartitionColumns;
private String metadataPartitions;
private String inflightMetadataPartitions;
@@ -820,8 +818,8 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
- public PropertyBuilder setDropPartitionColumnsWhenWrite(Boolean dropPartitionColumnsWhenWrite) {
- this.dropPartitionColumnsWhenWrite = dropPartitionColumnsWhenWrite;
+ public PropertyBuilder setShouldDropPartitionColumns(Boolean shouldDropPartitionColumns) {
+ this.shouldDropPartitionColumns = shouldDropPartitionColumns;
return this;
}
@@ -933,15 +931,12 @@ public class HoodieTableMetaClient implements Serializable {
if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) {
setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT));
}
-
if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) {
- setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
+ setShouldDropPartitionColumns(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
}
-
if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) {
setMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS));
}
-
if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
}
@@ -1026,15 +1021,12 @@ public class HoodieTableMetaClient implements Serializable {
if (null != partitionMetafileUseBaseFormat) {
tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString());
}
-
- if (null != dropPartitionColumnsWhenWrite) {
- tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite));
+ if (null != shouldDropPartitionColumns) {
+ tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(shouldDropPartitionColumns));
}
-
if (null != metadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS, metadataPartitions);
}
-
if (null != inflightMetadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 7ca62d1520..5aa82642de 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -66,7 +66,7 @@ public class DataSourceInternalWriterHelper {
writeClient.startCommitWithTime(instantTime);
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
- this.metaClient.validateTableProperties(writeConfig.getProps(), WriteOperationType.BULK_INSERT);
+ this.metaClient.validateTableProperties(writeConfig.getProps());
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 5414a228c7..3c667d2b42 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -114,21 +114,21 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
- // We're delegating to Spark to append partition values to every row only in cases
- // when these corresponding partition-values are not persisted w/in the data file itself
- val shouldAppendPartitionColumns = shouldOmitPartitionColumns
-
- val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
- case HoodieFileFormat.PARQUET =>
- (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID)
- case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
- }
+ val (tableFileFormat, formatClassName) =
+ metaClient.getTableConfig.getBaseFileFormat match {
+ case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
+ case HoodieFileFormat.PARQUET =>
+ // We're delegating to Spark to append partition values to every row only in cases
+ // when these corresponding partition-values are not persisted w/in the data file itself
+ val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
+ (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
+ }
if (globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
- // - Values parsed from the actual partition pat would be appended to the final dataset
+ // - Values parsed from the actual partition path would be appended to the final dataset
//
// In the former case, we don't need to provide the partition-schema to the relation,
// therefore we simply stub it w/ empty schema and use full table-schema as the one being
@@ -136,7 +136,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
//
// In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
// being a table-schema with all partition columns stripped out
- val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
+ val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) {
(fileIndex.partitionSchema, fileIndex.dataSchema)
} else {
(StructType(Nil), tableStructSchema)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 30bbaeceb1..0d4c7cf184 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -18,14 +18,16 @@
package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig}
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.Option
+import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.hive.util.ConfigUtils
-import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool, MultiPartKeysValueExtractor, NonPartitionedExtractor, SlashEncodedDayPartitionValueExtractor}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -45,6 +47,7 @@ import scala.language.implicitConversions
* Options supported for reading hoodie tables.
*/
object DataSourceReadOptions {
+ import DataSourceOptionsHelper._
val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
@@ -124,6 +127,15 @@ object DataSourceReadOptions {
.withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " +
"skipping over files")
+ val EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH: ConfigProperty[Boolean] =
+ ConfigProperty.key("hoodie.datasource.read.extract.partition.values.from.path")
+ .defaultValue(false)
+ .sinceVersion("0.11.0")
+ .withDocumentation("When set to true, values for partition columns (partition values) will be extracted" +
+ " from physical partition path (default Spark behavior). When set to false partition values will be" +
+ " read from the data file (in Hudi partition columns are persisted by default)." +
+ " This config is a fallback allowing to preserve existing behavior, and should not be used otherwise.")
+
val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
@@ -185,6 +197,8 @@ object DataSourceReadOptions {
*/
object DataSourceWriteOptions {
+ import DataSourceOptionsHelper._
+
val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
@@ -471,10 +485,7 @@ object DataSourceWriteOptions {
.sinceVersion("0.9.0")
.withDocumentation("This class is used by kafka client to deserialize the records")
- val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty
- .key(HoodieTableConfig.DROP_PARTITION_COLUMNS.key())
- .defaultValue(HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue().booleanValue())
- .withDocumentation(HoodieTableConfig.DROP_PARTITION_COLUMNS.doc())
+ val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS
/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
@@ -774,4 +785,23 @@ object DataSourceOptionsHelper {
override def apply (input: From): To = function (input)
}
}
+
+ implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = {
+ checkState(prop.hasDefaultValue)
+ var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key())
+ .defaultValue(converter(prop.defaultValue()))
+ .withDocumentation(prop.doc())
+ .withAlternatives(prop.getAlternatives.asScala: _*)
+
+ newProp = toScalaOption(prop.getSinceVersion) match {
+ case Some(version) => newProp.sinceVersion(version)
+ case None => newProp
+ }
+ newProp = toScalaOption(prop.getDeprecatedVersion) match {
+ case Some(version) => newProp.deprecatedAfter(version)
+ case None => newProp
+ }
+
+ newProp
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 2fd1da5950..f776d08ec9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -149,8 +149,36 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
- protected val shouldOmitPartitionColumns: Boolean =
- metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
+ /**
+ * Controls whether partition values (ie values of partition columns) should be
+ * <ol>
+ * <li>Extracted from partition path and appended to individual rows read from the data file (we
+ * delegate this to Spark's [[ParquetFileFormat]])</li>
+ * <li>Read from the data-file as is (by default Hudi persists all columns including partition ones)</li>
+ * </ol>
+ *
+ * This flag is only be relevant in conjunction with the usage of [["hoodie.datasource.write.drop.partition.columns"]]
+ * config, when Hudi will NOT be persisting partition columns in the data file, and therefore values for
+ * such partition columns (ie "partition values") will have to be parsed from the partition path, and appended
+ * to every row only in the fetched dataset.
+ *
+ * NOTE: Partition values extracted from partition path might be deviating from the values of the original
+ * partition columns: for ex, if originally as partition column was used column [[ts]] bearing epoch
+ * timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate partition path of the format
+ * [["yyyy/mm/dd"]], appended partition value would bear the format verbatim as it was used in the
+ * partition path, meaning that string value of "2022/01/01" will be appended, and not its original
+ * representation
+ */
+ protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
+ // Controls whether partition columns (which are the source for the partition path values) should
+ // be omitted from persistence in the data files. On the read path it affects whether partition values (values
+ // of partition columns) will be read from the data file ot extracted from partition path
+ val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
+ val shouldExtractPartitionValueFromPath =
+ optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+ DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+ shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
+ }
/**
* NOTE: PLEASE READ THIS CAREFULLY
@@ -228,7 +256,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
-
val tableAvroSchemaStr =
if (internalSchema.isEmptySchema) tableAvroSchema.toString
else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
@@ -367,7 +394,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
try {
val tableConfig = metaClient.getTableConfig
- if (shouldOmitPartitionColumns) {
+ if (shouldExtractPartitionValuesFromPartitionPath) {
val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
if (hiveStylePartitioningEnabled) {
@@ -420,9 +447,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
hadoopConf = hadoopConf
)
- // We're delegating to Spark to append partition values to every row only in cases
- // when these corresponding partition-values are not persisted w/in the data file itself
- val shouldAppendPartitionColumns = shouldOmitPartitionColumns
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
@@ -431,7 +455,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
filters = filters,
options = options,
hadoopConf = hadoopConf,
- appendPartitionValues = shouldAppendPartitionColumns
+ // We're delegating to Spark to append partition values to every row only in cases
+ // when these corresponding partition-values are not persisted w/in the data file itself
+ appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)
partitionedFile => {
@@ -448,7 +474,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
- if (shouldOmitPartitionColumns) {
+ if (shouldExtractPartitionValuesFromPartitionPath) {
val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 1fc9e70a5a..e430364be9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -50,7 +50,6 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
options: Map[String, String],
hadoopConf: Configuration,
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
-
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 38062aa802..c5204fcd8e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -160,7 +160,7 @@ object HoodieSparkSqlWriter {
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
- .setDropPartitionColumnsWhenWrite(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
+ .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
index dbb62d089e..a52e9335fe 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{DataSourceReadOptions, SparkAdapterSupport}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -41,14 +41,16 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val shouldExtractPartitionValuesFromPartitionPath =
+ options.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+ DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+
sparkAdapter
- .createHoodieParquetFileFormat(appendPartitionValues = false).get
+ .createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}
}
object HoodieParquetFileFormat {
-
val FILE_FORMAT_ID = "hoodie-parquet"
-
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 28a6dcdcd6..a088d4e01b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -749,7 +749,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
- def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) {
+ def testCopyOnWriteWithDroppedPartitionColumns(enableDropPartitionColumns: Boolean) {
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
@@ -900,7 +900,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
- def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = {
+ def testPartitionColumnsProperHandling(useGlobbing: Boolean): Unit = {
val _spark = spark
import _spark.implicits._
@@ -935,18 +935,41 @@ class TestCOWDataSource extends HoodieClientTestBase {
basePath
}
- val res = spark.read.format("hudi").load(path)
+ // Case #1: Partition columns are read from the data file
+ val firstDF = spark.read.format("hudi").load(path)
- assert(res.count() == 2)
+ assert(firstDF.count() == 2)
// data_date is the partition field. Persist to the parquet file using the origin values, and read it.
assertEquals(
- res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq,
- Seq("2018-09-23", "2018-09-24")
+ Seq("2018-09-23", "2018-09-24"),
+ firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq
)
assertEquals(
- res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq,
- Seq("2018/09/23", "2018/09/24")
+ Seq("2018/09/23", "2018/09/24"),
+ firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
)
+
+ // Case #2: Partition columns are extracted from the partition path
+ //
+ // NOTE: This case is only relevant when globbing is NOT used, since when globbing is used Spark
+ // won't be able to infer partitioning properly
+ if (!useGlobbing) {
+ val secondDF = spark.read.format("hudi")
+ .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, "true")
+ .load(path)
+
+ assert(secondDF.count() == 2)
+
+ // data_date is the partition field. Persist to the parquet file using the origin values, and read it.
+ assertEquals(
+ Seq("2018/09/23", "2018/09/24"),
+ secondDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq
+ )
+ assertEquals(
+ Seq("2018/09/23", "2018/09/24"),
+ secondDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
+ )
+ }
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a9d67a0d72..b086a6c9ed 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -281,7 +281,7 @@ public class DeltaSync implements Serializable {
.setPreCombineField(cfg.sourceOrderingField)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
- .setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
+ .setShouldDropPartitionColumns(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath);
}
@@ -377,7 +377,7 @@ public class DeltaSync implements Serializable {
SimpleKeyGenerator.class.getName()))
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
- .setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
+ .setShouldDropPartitionColumns(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
}