You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 10:30:53 UTC

[hudi] 02/13: [HUDI-3935] Adding config to fallback to enabled Partition Values extraction from Partition path (#5377)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6fccca6c04724a304c117b77a8cb42b0e433a719
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Apr 21 01:36:19 2022 -0700

    [HUDI-3935] Adding config to fallback to enabled Partition Values extraction from Partition path (#5377)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../hudi/common/table/HoodieTableConfig.java       |  2 +-
 .../hudi/common/table/HoodieTableMetaClient.java   | 26 +++++--------
 .../internal/DataSourceInternalWriterHelper.java   |  2 +-
 .../org/apache/hudi/BaseFileOnlyRelation.scala     | 22 +++++------
 .../scala/org/apache/hudi/DataSourceOptions.scala  | 40 +++++++++++++++++---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 44 +++++++++++++++++-----
 .../org/apache/hudi/HoodieDataSourceHelper.scala   |  1 -
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  2 +-
 .../parquet/HoodieParquetFileFormat.scala          | 10 +++--
 .../apache/hudi/functional/TestCOWDataSource.scala | 39 +++++++++++++++----
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  4 +-
 12 files changed, 133 insertions(+), 61 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c1ebef7bb6..4b747d3a77 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1465,7 +1465,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     }
 
     // Validate table properties
-    metaClient.validateTableProperties(config.getProps(), operationType);
+    metaClient.validateTableProperties(config.getProps());
     // Make sure that FS View is in sync
     table.getHoodieView().sync();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index bbc508bd5f..edc6caa5bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -193,7 +193,7 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final ConfigProperty<Boolean> PARTITION_METAFILE_USE_BASE_FORMAT = ConfigProperty
       .key("hoodie.partition.metafile.use.base.format")
       .defaultValue(false)
-      .withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). "
+      .withDocumentation("If true, partition metafiles are saved in the same format as base-files for this dataset (e.g. Parquet / ORC). "
           + "If false (default) partition metafiles are saved as properties files.");
 
   public static final ConfigProperty<Boolean> DROP_PARTITION_COLUMNS = ConfigProperty
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index a65be689a4..251a990d87 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.fs.NoOpConsistencyGuard;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -377,16 +376,15 @@ public class HoodieTableMetaClient implements Serializable {
   /**
    * Validate table properties.
    * @param properties Properties from writeConfig.
-   * @param operationType operation type to be executed.
    */
-  public void validateTableProperties(Properties properties, WriteOperationType operationType) {
-    // once meta fields are disabled, it cant be re-enabled for a given table.
+  public void validateTableProperties(Properties properties) {
+    // Once meta fields are disabled, it cant be re-enabled for a given table.
     if (!getTableConfig().populateMetaFields()
         && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) {
       throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
     }
 
-    // meta fields can be disabled only with SimpleKeyGenerator
+    // Meta fields can be disabled only when {@code SimpleKeyGenerator} is used
     if (!getTableConfig().populateMetaFields()
         && !properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator")
         .equals("org.apache.hudi.keygen.SimpleKeyGenerator")) {
@@ -698,7 +696,7 @@ public class HoodieTableMetaClient implements Serializable {
     private Boolean urlEncodePartitioning;
     private HoodieTimelineTimeZone commitTimeZone;
     private Boolean partitionMetafileUseBaseFormat;
-    private Boolean dropPartitionColumnsWhenWrite;
+    private Boolean shouldDropPartitionColumns;
     private String metadataPartitions;
     private String inflightMetadataPartitions;
 
@@ -820,8 +818,8 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
-    public PropertyBuilder setDropPartitionColumnsWhenWrite(Boolean dropPartitionColumnsWhenWrite) {
-      this.dropPartitionColumnsWhenWrite = dropPartitionColumnsWhenWrite;
+    public PropertyBuilder setShouldDropPartitionColumns(Boolean shouldDropPartitionColumns) {
+      this.shouldDropPartitionColumns = shouldDropPartitionColumns;
       return this;
     }
 
@@ -933,15 +931,12 @@ public class HoodieTableMetaClient implements Serializable {
       if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) {
         setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT));
       }
-
       if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) {
-        setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
+        setShouldDropPartitionColumns(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
       }
-
       if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) {
         setMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS));
       }
-
       if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
         setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
       }
@@ -1026,15 +1021,12 @@ public class HoodieTableMetaClient implements Serializable {
       if (null != partitionMetafileUseBaseFormat) {
         tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString());
       }
-
-      if (null != dropPartitionColumnsWhenWrite) {
-        tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite));
+      if (null != shouldDropPartitionColumns) {
+        tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(shouldDropPartitionColumns));
       }
-
       if (null != metadataPartitions) {
         tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS, metadataPartitions);
       }
-
       if (null != inflightMetadataPartitions) {
         tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
       }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 7ca62d1520..5aa82642de 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -66,7 +66,7 @@ public class DataSourceInternalWriterHelper {
     writeClient.startCommitWithTime(instantTime);
 
     this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
-    this.metaClient.validateTableProperties(writeConfig.getProps(), WriteOperationType.BULK_INSERT);
+    this.metaClient.validateTableProperties(writeConfig.getProps());
     this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 5414a228c7..3c667d2b42 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -114,21 +114,21 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
-    // We're delegating to Spark to append partition values to every row only in cases
-    // when these corresponding partition-values are not persisted w/in the data file itself
-    val shouldAppendPartitionColumns = shouldOmitPartitionColumns
-
-    val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
-      case HoodieFileFormat.PARQUET =>
-        (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID)
-      case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
-    }
+      val (tableFileFormat, formatClassName) =
+        metaClient.getTableConfig.getBaseFileFormat match {
+          case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
+          case HoodieFileFormat.PARQUET =>
+            // We're delegating to Spark to append partition values to every row only in cases
+            // when these corresponding partition-values are not persisted w/in the data file itself
+            val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
+            (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
+        }
 
     if (globPaths.isEmpty) {
       // NOTE: There are currently 2 ways partition values could be fetched:
       //          - Source columns (producing the values used for physical partitioning) will be read
       //          from the data file
-      //          - Values parsed from the actual partition pat would be appended to the final dataset
+      //          - Values parsed from the actual partition path would be appended to the final dataset
       //
       //        In the former case, we don't need to provide the partition-schema to the relation,
       //        therefore we simply stub it w/ empty schema and use full table-schema as the one being
@@ -136,7 +136,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
       //
       //        In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
       //        being a table-schema with all partition columns stripped out
-      val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
+      val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) {
         (fileIndex.partitionSchema, fileIndex.dataSchema)
       } else {
         (StructType(Nil), tableStructSchema)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 30bbaeceb1..0d4c7cf184 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -18,14 +18,16 @@
 package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig}
 import org.apache.hudi.common.fs.ConsistencyGuardConfig
 import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.Option
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
 import org.apache.hudi.hive.util.ConfigUtils
-import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool, MultiPartKeysValueExtractor, NonPartitionedExtractor, SlashEncodedDayPartitionValueExtractor}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -45,6 +47,7 @@ import scala.language.implicitConversions
   * Options supported for reading hoodie tables.
   */
 object DataSourceReadOptions {
+  import DataSourceOptionsHelper._
 
   val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
   val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
@@ -124,6 +127,15 @@ object DataSourceReadOptions {
     .withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " +
       "skipping over files")
 
+  val EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH: ConfigProperty[Boolean] =
+    ConfigProperty.key("hoodie.datasource.read.extract.partition.values.from.path")
+      .defaultValue(false)
+      .sinceVersion("0.11.0")
+      .withDocumentation("When set to true, values for partition columns (partition values) will be extracted" +
+        " from physical partition path (default Spark behavior). When set to false partition values will be" +
+        " read from the data file (in Hudi partition columns are persisted by default)." +
+        " This config is a fallback allowing to preserve existing behavior, and should not be used otherwise.")
+
   val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
     .defaultValue("false")
@@ -185,6 +197,8 @@ object DataSourceReadOptions {
   */
 object DataSourceWriteOptions {
 
+  import DataSourceOptionsHelper._
+
   val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
   val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
   val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
@@ -471,10 +485,7 @@ object DataSourceWriteOptions {
     .sinceVersion("0.9.0")
     .withDocumentation("This class is used by kafka client to deserialize the records")
 
-  val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty
-    .key(HoodieTableConfig.DROP_PARTITION_COLUMNS.key())
-    .defaultValue(HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue().booleanValue())
-    .withDocumentation(HoodieTableConfig.DROP_PARTITION_COLUMNS.doc())
+  val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS
 
   /** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
   @Deprecated
@@ -774,4 +785,23 @@ object DataSourceOptionsHelper {
       override def apply (input: From): To = function (input)
     }
   }
+
+  implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = {
+    checkState(prop.hasDefaultValue)
+    var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key())
+      .defaultValue(converter(prop.defaultValue()))
+      .withDocumentation(prop.doc())
+      .withAlternatives(prop.getAlternatives.asScala: _*)
+
+    newProp = toScalaOption(prop.getSinceVersion) match {
+      case Some(version) => newProp.sinceVersion(version)
+      case None => newProp
+    }
+    newProp = toScalaOption(prop.getDeprecatedVersion) match {
+      case Some(version) => newProp.deprecatedAfter(version)
+      case None => newProp
+    }
+
+    newProp
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 2fd1da5950..f776d08ec9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -149,8 +149,36 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
 
-  protected val shouldOmitPartitionColumns: Boolean =
-    metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
+  /**
+   * Controls whether partition values (ie values of partition columns) should be
+   * <ol>
+   *    <li>Extracted from partition path and appended to individual rows read from the data file (we
+   *    delegate this to Spark's [[ParquetFileFormat]])</li>
+   *    <li>Read from the data-file as is (by default Hudi persists all columns including partition ones)</li>
+   * </ol>
+   *
+   * This flag is only be relevant in conjunction with the usage of [["hoodie.datasource.write.drop.partition.columns"]]
+   * config, when Hudi will NOT be persisting partition columns in the data file, and therefore values for
+   * such partition columns (ie "partition values") will have to be parsed from the partition path, and appended
+   * to every row only in the fetched dataset.
+   *
+   * NOTE: Partition values extracted from partition path might be deviating from the values of the original
+   *       partition columns: for ex, if originally as partition column was used column [[ts]] bearing epoch
+   *       timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate partition path of the format
+   *       [["yyyy/mm/dd"]], appended partition value would bear the format verbatim as it was used in the
+   *       partition path, meaning that string value of "2022/01/01" will be appended, and not its original
+   *       representation
+   */
+  protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
+    // Controls whether partition columns (which are the source for the partition path values) should
+    // be omitted from persistence in the data files. On the read path it affects whether partition values (values
+    // of partition columns) will be read from the data file ot extracted from partition path
+    val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
+    val shouldExtractPartitionValueFromPath =
+      optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+        DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
+  }
 
   /**
    * NOTE: PLEASE READ THIS CAREFULLY
@@ -228,7 +256,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-
     val tableAvroSchemaStr =
       if (internalSchema.isEmptySchema) tableAvroSchema.toString
       else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
@@ -367,7 +394,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
     try {
       val tableConfig = metaClient.getTableConfig
-      if (shouldOmitPartitionColumns) {
+      if (shouldExtractPartitionValuesFromPartitionPath) {
         val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
         val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
         if (hiveStylePartitioningEnabled) {
@@ -420,9 +447,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       hadoopConf = hadoopConf
     )
 
-    // We're delegating to Spark to append partition values to every row only in cases
-    // when these corresponding partition-values are not persisted w/in the data file itself
-    val shouldAppendPartitionColumns = shouldOmitPartitionColumns
     val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
       sparkSession = spark,
       dataSchema = dataSchema.structTypeSchema,
@@ -431,7 +455,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       filters = filters,
       options = options,
       hadoopConf = hadoopConf,
-      appendPartitionValues = shouldAppendPartitionColumns
+      // We're delegating to Spark to append partition values to every row only in cases
+      // when these corresponding partition-values are not persisted w/in the data file itself
+      appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
     )
 
     partitionedFile => {
@@ -448,7 +474,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
                                        requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
-    if (shouldOmitPartitionColumns) {
+    if (shouldExtractPartitionValuesFromPartitionPath) {
       val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
       val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
       val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 1fc9e70a5a..e430364be9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -50,7 +50,6 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
                                options: Map[String, String],
                                hadoopConf: Configuration,
                                appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
-
     val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
     val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
       sparkSession = sparkSession,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 38062aa802..c5204fcd8e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -160,7 +160,7 @@ object HoodieSparkSqlWriter {
           .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
           .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
-          .setDropPartitionColumnsWhenWrite(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
+          .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
           .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
           .initTable(sparkContext.hadoopConfiguration, path)
         tableConfig = tableMetaClient.getTableConfig
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
index dbb62d089e..a52e9335fe 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{DataSourceReadOptions, SparkAdapterSupport}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -41,14 +41,16 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport
                                               filters: Seq[Filter],
                                               options: Map[String, String],
                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val shouldExtractPartitionValuesFromPartitionPath =
+      options.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+        DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+
     sparkAdapter
-      .createHoodieParquetFileFormat(appendPartitionValues = false).get
+      .createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
       .buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
   }
 }
 
 object HoodieParquetFileFormat {
-
   val FILE_FORMAT_ID = "hoodie-parquet"
-
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 28a6dcdcd6..a088d4e01b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -749,7 +749,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
-  def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) {
+  def testCopyOnWriteWithDroppedPartitionColumns(enableDropPartitionColumns: Boolean) {
     val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     inputDF1.write.format("org.apache.hudi")
@@ -900,7 +900,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
-  def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = {
+  def testPartitionColumnsProperHandling(useGlobbing: Boolean): Unit = {
     val _spark = spark
     import _spark.implicits._
 
@@ -935,18 +935,41 @@ class TestCOWDataSource extends HoodieClientTestBase {
       basePath
     }
 
-    val res = spark.read.format("hudi").load(path)
+    // Case #1: Partition columns are read from the data file
+    val firstDF = spark.read.format("hudi").load(path)
 
-    assert(res.count() == 2)
+    assert(firstDF.count() == 2)
 
     // data_date is the partition field. Persist to the parquet file using the origin values, and read it.
     assertEquals(
-      res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq,
-      Seq("2018-09-23", "2018-09-24")
+      Seq("2018-09-23", "2018-09-24"),
+      firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq
     )
     assertEquals(
-      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq,
-      Seq("2018/09/23", "2018/09/24")
+      Seq("2018/09/23", "2018/09/24"),
+      firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
     )
+
+    // Case #2: Partition columns are extracted from the partition path
+    //
+    // NOTE: This case is only relevant when globbing is NOT used, since when globbing is used Spark
+    //       won't be able to infer partitioning properly
+    if (!useGlobbing) {
+      val secondDF = spark.read.format("hudi")
+        .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, "true")
+        .load(path)
+
+      assert(secondDF.count() == 2)
+
+      // data_date is the partition field. Persist to the parquet file using the origin values, and read it.
+      assertEquals(
+        Seq("2018/09/23", "2018/09/24"),
+        secondDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq
+      )
+      assertEquals(
+        Seq("2018/09/23", "2018/09/24"),
+        secondDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
+      )
+    }
   }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a9d67a0d72..b086a6c9ed 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -281,7 +281,7 @@ public class DeltaSync implements Serializable {
           .setPreCombineField(cfg.sourceOrderingField)
           .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
               HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
-          .setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
+          .setShouldDropPartitionColumns(isDropPartitionColumns())
           .initTable(new Configuration(jssc.hadoopConfiguration()),
             cfg.targetBasePath);
     }
@@ -377,7 +377,7 @@ public class DeltaSync implements Serializable {
               SimpleKeyGenerator.class.getName()))
           .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
               HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
-          .setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
+          .setShouldDropPartitionColumns(isDropPartitionColumns())
           .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
     }