You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 10:30:55 UTC
[hudi] 04/13: nested field patch
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2a5f6168393f1d75f839bfab2a4636f06877a7f1
Author: Raymond Xu <xu...@gmail.com>
AuthorDate: Thu Apr 21 18:15:54 2022 +0800
nested field patch
apply from https://patch-diff.githubusercontent.com/raw/apache/hudi/pull/5379.patch
---
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 32 +++++++++++++++-------
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 7 +++++
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 11 +++++---
.../hudi/functional/TestMORDataSourceStorage.scala | 28 ++++++++++++-------
4 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 41be0b00c0..47be7117a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,6 +18,17 @@
package org.apache.hudi.avro;
+import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
@@ -42,16 +53,6 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hudi.common.config.SerializableSchema;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.SchemaCompatibilityException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -484,6 +485,17 @@ public class HoodieAvroUtils {
return projectedSchema;
}
+ /**
+ * Obtain the root-level field name of a full field name, possibly a nested field.
+ * For example, given "a.b.c", the output is "a"; given "a", the output is "a".
+ *
+ * @param fieldName The field name.
+ * @return Root-level field name
+ */
+ public static String getRootLevelFieldName(String fieldName) {
+ return fieldName.split("\\.")[0];
+ }
+
/**
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
*/
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 246d74411d..bd0254da3d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -257,6 +257,13 @@ public class TestHoodieAvroUtils {
assertEquals(expectedSchema, rec1.getSchema());
}
+ @Test
+ public void testGetRootLevelFieldName() {
+ assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a.b.c"));
+ assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a"));
+ assertEquals("", HoodieAvroUtils.getRootLevelFieldName(""));
+ }
+
@Test
public void testGetNestedFieldVal() {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f776d08ec9..aac57e1bbb 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
@@ -39,10 +40,8 @@ import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -363,7 +362,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
- val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
+ // For a nested field in mandatory columns, we should first get the root-level field, and then
+ // check for any missing column, as the requestedColumns should only contain root-level fields
+ // We should only append root-level field as well
+ val missing = mandatoryColumns.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
+ .filter(rootField => !requestedColumns.contains(rootField))
requestedColumns ++ missing
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 18b639f2f9..8cf6b4174c 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
@@ -32,7 +33,7 @@ import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.CsvSource
import scala.collection.JavaConversions._
@@ -57,19 +58,28 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
val updatedVerificationVal: String = "driver_update"
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testMergeOnReadStorage(isMetadataEnabled: Boolean) {
- val dataGen = new HoodieTestDataGenerator()
+ @CsvSource(Array(
+ "true,",
+ "true,fare.currency",
+ "false,",
+ "false,fare.currency"
+ ))
+ def testMergeOnReadStorage(isMetadataEnabled: Boolean, preComineField: String) {
+ var options: Map[String, String] = commonOpts +
+ (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled))
+ if (!StringUtils.isNullOrEmpty(preComineField)) {
+ options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preComineField)
+ }
+ val dataGen = new HoodieTestDataGenerator(0xDEEF)
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
// Bulk Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
- .options(commonOpts)
+ .options(options)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -90,8 +100,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(options)
.mode(SaveMode.Append)
.save(basePath)
@@ -110,8 +119,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
inputDF3.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(options)
.mode(SaveMode.Append)
.save(basePath)