You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/19 10:30:14 UTC

[hudi] 06/11: [HUDI-3707] Fix target schema handling in HoodieSparkUtils while creating RDD (#5347)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d979feee3483debe7c8aa0f3d25ebed18e41bcfe
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Mon Apr 18 23:04:04 2022 +0530

    [HUDI-3707] Fix target schema handling in HoodieSparkUtils while creating RDD (#5347)
---
 .../main/scala/org/apache/hudi/HoodieSparkUtils.scala | 19 +++++++++++++------
 .../utilities/functional/TestHoodieDeltaStreamer.java |  5 -----
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 57eb32fce3..54bc06bd76 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -26,6 +26,9 @@ import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
@@ -36,12 +39,8 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import java.util.Properties
-
-import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
 
+import java.util.Properties
 import scala.collection.JavaConverters._
 
 object HoodieSparkUtils extends SparkAdapterSupport {
@@ -130,7 +129,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
    */
   def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
                 latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
-    val latestTableSchemaConverted = if (latestTableSchema.isPresent && reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+    var latestTableSchemaConverted : Option[Schema] = None
+
+    if (latestTableSchema.isPresent && reconcileToLatestSchema) {
+      latestTableSchemaConverted = Some(latestTableSchema.get())
+    } else {
+      // cases when users want to use latestTableSchema but have not turned on reconcileToLatestSchema explicitly
+      // for example, when using a Transformer implementation to transform source RDD to target RDD
+      latestTableSchemaConverted = if (latestTableSchema.isPresent) Some(latestTableSchema.get()) else None
+    }
     createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
   }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 804676f0ff..0576f6aaee 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1690,7 +1690,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     testParquetDFSSource(false, null, true);
   }
 
-  @Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
   @Test
   public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
     testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
@@ -1701,7 +1700,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     testParquetDFSSource(true, null);
   }
 
-  @Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
   @Test
   public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
     testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
@@ -1712,7 +1710,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     testORCDFSSource(false, null);
   }
 
-  @Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
   @Test
   public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception {
     testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
@@ -1807,7 +1804,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
-  @Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
   @Test
   public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
     // The CSV files have header, the columns are separated by '\t'
@@ -1850,7 +1846,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
   }
 
-  @Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
   @Test
   public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
     // The CSV files do not have header, the columns are separated by '\t'