You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/22 00:28:10 UTC

[hudi] 04/04: [HUDI-3936] Fix projection for a nested field as pre-combined key (#5379)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.11-0-apr21
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6fe5c27a0c080ebb79eec6544c0dc5cacc028919
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Thu Apr 21 17:17:57 2022 -0700

    [HUDI-3936] Fix projection for a nested field as pre-combined key (#5379)
    
    This PR fixes the projection logic around a nested field which is used as the pre-combined key field. The fix is to only check and append the root level field for projection, i.e., "a", for a nested field "a.b.c" in the mandatory columns.
    
    - Changes the logic to check and append the root level field for a required nested field in the mandatory columns in HoodieBaseRelation.appendMandatoryColumns
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 32 +++++++++++++++-------
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |  7 +++++
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |  6 ++--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 19 ++++++++-----
 .../hudi/MergeOnReadIncrementalRelation.scala      |  2 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  2 +-
 .../hudi/functional/TestMORDataSourceStorage.scala | 28 ++++++++++++-------
 .../functional/TestParquetColumnProjection.scala   |  4 +--
 8 files changed, 66 insertions(+), 34 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 163a1b3fa5..f69d5683d1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,6 +18,17 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Conversions;
 import org.apache.avro.Conversions.DecimalConversion;
@@ -42,16 +53,6 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hudi.common.config.SerializableSchema;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -480,6 +481,17 @@ public class HoodieAvroUtils {
     return projectedSchema;
   }
 
+  /**
+   * Obtain the root-level field name of a full field name, possibly a nested field.
+   * For example, given "a.b.c", the output is "a"; given "a", the output is "a".
+   *
+   * @param fieldName The field name.
+   * @return Root-level field name
+   */
+  public static String getRootLevelFieldName(String fieldName) {
+    return fieldName.split("\\.")[0];
+  }
+
   /**
    * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
    */
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 246d74411d..bd0254da3d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -257,6 +257,13 @@ public class TestHoodieAvroUtils {
     assertEquals(expectedSchema, rec1.getSchema());
   }
 
+  @Test
+  public void testGetRootLevelFieldName() {
+    assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a.b.c"));
+    assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a"));
+    assertEquals("", HoodieAvroUtils.getRootLevelFieldName(""));
+  }
+
   @Test
   public void testGetNestedFieldVal() {
     GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 3c667d2b42..c57f46a7b6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
@@ -54,8 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieBaseFileSplit
 
-  override lazy val mandatoryColumns: Seq[String] =
-    // TODO reconcile, record's key shouldn't be mandatory for base-file only relation
+  override lazy val mandatoryFields: Seq[String] =
+  // TODO reconcile, record's key shouldn't be mandatory for base-file only relation
     Seq(recordKeyField)
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f776d08ec9..4b7177f4d6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
@@ -39,10 +40,8 @@ import org.apache.hudi.io.storage.HoodieHFileReader
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -199,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    *
    * @VisibleInTests
    */
-  val mandatoryColumns: Seq[String]
+  val mandatoryFields: Seq[String]
+
+  protected def mandatoryRootFields: Seq[String] =
+    mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
 
   protected def timeline: HoodieTimeline =
   // NOTE: We're including compaction here since it's not considering a "commit" operation
@@ -246,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
     //
     // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
     //       PROJECTION
-    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+    val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns)
 
     val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
       HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
@@ -362,8 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       !SubqueryExpression.hasSubquery(condition)
   }
 
-  protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
-    val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
+  protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = {
+    // For a nested field in mandatory columns, we should first get the root-level field, and then
+    // check for any missing column, as the requestedColumns should only contain root-level fields
+    // We should only append root-level field as well
+    val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField))
     requestedColumns ++ missing
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 6aa7007851..806a5e371d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
     Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
   }
 
-  override lazy val mandatoryColumns: Seq[String] = {
+  override lazy val mandatoryFields: Seq[String] = {
     // NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in
     //       cases when no columns are requested to be fetched (for ex, when using {@code count()} API)
     Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index a88eb63036..75bc96624e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieMergeOnReadFileSplit
 
-  override lazy val mandatoryColumns: Seq[String] =
+  override lazy val mandatoryFields: Seq[String] =
     Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
 
   protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 18b639f2f9..8cf6b4174c 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
@@ -32,7 +33,7 @@ import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.CsvSource
 
 import scala.collection.JavaConversions._
 
@@ -57,19 +58,28 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
   val updatedVerificationVal: String = "driver_update"
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testMergeOnReadStorage(isMetadataEnabled: Boolean) {
-    val dataGen = new HoodieTestDataGenerator()
+  @CsvSource(Array(
+    "true,",
+    "true,fare.currency",
+    "false,",
+    "false,fare.currency"
+  ))
+  def testMergeOnReadStorage(isMetadataEnabled: Boolean, preComineField: String) {
+    var options: Map[String, String] = commonOpts +
+      (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled))
+    if (!StringUtils.isNullOrEmpty(preComineField)) {
+      options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preComineField)
+    }
+    val dataGen = new HoodieTestDataGenerator(0xDEEF)
     val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
     // Bulk Insert Operation
     val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
     val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     inputDF1.write.format("org.apache.hudi")
-      .options(commonOpts)
+      .options(options)
       .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
@@ -90,8 +100,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
     val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
     val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
     inputDF2.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -110,8 +119,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
     val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
 
     inputDF3.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index f670450c3e..945d26be3f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
 
 import org.apache.avro.Schema
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
 import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
@@ -332,7 +332,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
         logWarning(s"Not matching bytes read ($bytesRead)")
       }
 
-      val readColumns = targetColumns ++ relation.mandatoryColumns
+      val readColumns = targetColumns ++ relation.mandatoryFields
       val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
 
       val row: InternalRow = rows.take(1).head