You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 09:23:42 UTC

[hudi] branch release-0.13.0 updated (e6d44c015b7 -> 40d534e878b)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a change to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from e6d44c015b7 [HUDI-5633] Fixing performance regression in `HoodieSparkRecord` (#7769)
     new 22b634b0815 [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)
     new 1ccf37f2287 [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)
     new 9d3a8d5924d [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)
     new b3fa99685ed [HUDI-5646] Guard dropping columns by a config, do not allow by default (#7787)
     new 6ef2135b2fa [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810)
     new 4a61e82576d [HUDI-5681] Fixing Kryo being instantiated w/ invalid `SparkConf` (#7821)
     new c487ee428d5 [HUDI-5676] Fix BigQuerySyncTool standalone mode (#7816)
     new 40d534e878b [HUDI-5647] Automate savepoint and restore tests (#7796)

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  20 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  12 +-
 .../hudi/client/TestTableSchemaEvolution.java      | 125 +++++++----
 .../TestSavepointRestoreCopyOnWrite.java           | 173 ++++++++++++++
 .../TestSavepointRestoreMergeOnRead.java           | 248 +++++++++++++++++++++
 .../hudi/testutils/HoodieClientTestBase.java       |  62 +++++-
 .../hudi/testutils/HoodieClientTestUtils.java      |  19 +-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  23 +-
 .../apache/hudi/common/config/HoodieConfig.java    |   8 -
 .../hudi/common/table/TableSchemaResolver.java     |  36 ---
 .../org/apache/hudi/common/util/ParquetUtils.java  |  34 +--
 .../apache/hudi/table/catalog/HiveSchemaUtils.java |   8 +-
 .../hudi/table/catalog/HoodieHiveCatalog.java      |   2 +-
 .../hudi/table/catalog/TableOptionProperties.java  |  25 ++-
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  |  15 ++
 .../hudi/gcp/bigquery/BigQuerySyncConfig.java      |  38 ++--
 .../gcp/bigquery/TestBigQuerySyncToolArgs.java     |  70 ++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  68 +++---
 .../org/apache/spark/sql/hudi/SerDeUtils.scala     |  44 ----
 .../AlterHoodieTableChangeColumnCommand.scala      |   2 +-
 .../command/InsertIntoHoodieTableCommand.scala     |   4 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   8 +-
 .../hudi/command/payload/ExpressionPayload.scala   |  54 ++++-
 .../command/procedures/DeleteMarkerProcedure.scala |   8 +-
 .../RollbackToInstantTimeProcedure.scala           |  50 +++--
 .../command/procedures/RunCleanProcedure.scala     |  27 ++-
 .../procedures/RunCompactionProcedure.scala        | 126 ++++++-----
 .../hudi/TestAvroSchemaResolutionSupport.scala     |  19 +-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  49 ++--
 .../hudi/functional/TestBasicSchemaEvolution.scala |  21 +-
 .../hudi/functional/TestCOWDataSourceStorage.scala |  10 +-
 .../hudi/functional/TestColumnStatsIndex.scala     |   7 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 120 +---------
 .../org/apache/hudi/utilities/UtilHelpers.java     |  13 ++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   8 +-
 .../hudi/utilities/sources/HoodieIncrSource.java   |  23 +-
 packaging/hudi-gcp-bundle/pom.xml                  |   8 +-
 37 files changed, 1108 insertions(+), 479 deletions(-)
 create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
 create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
 create mode 100644 hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
 delete mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala


[hudi] 03/08: [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9d3a8d5924d5ca95489e52adf41c418bb2eb6663
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Wed Feb 1 13:54:37 2023 +0800

    [HUDI-5317] Fix insert overwrite table for partitioned table (#7793)
---
 .../command/InsertIntoHoodieTableCommand.scala     |   4 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 120 ++-------------------
 2 files changed, 12 insertions(+), 112 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 2e4c1db099e..f07611ad019 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -90,8 +90,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
     var mode = SaveMode.Append
     var isOverWriteTable = false
     var isOverWritePartition = false
-    if (overwrite && catalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+    if (overwrite && partitionSpec.isEmpty) {
+      // insert overwrite table
       mode = SaveMode.Overwrite
       isOverWriteTable = true
     } else {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index a227a20b8b6..b092a68e20d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -369,7 +369,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
            | partitioned by (dt)
            | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
-      //  Insert overwrite dynamic partition
+
+      //  Insert overwrite table
       spark.sql(
         s"""
            | insert overwrite table $tableName
@@ -379,14 +380,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq(1, "a1", 10.0, 1000, "2021-01-05")
       )
 
-      //  Insert overwrite dynamic partition
+      //  Insert overwrite table
       spark.sql(
         s"""
            | insert overwrite table $tableName
            | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-01-06' as dt
         """.stripMargin)
       checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
         Seq(2, "a2", 10.0, 1000, "2021-01-06")
       )
 
@@ -433,122 +433,22 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         """.stripMargin)
       checkAnswer(s"select id, name, price, ts, dt from $tableName " +
         s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
-        Seq(2, "a2", 12.0, 1000, "2021-01-05"),
-        Seq(2, "a2", 10.0, 1000, "2021-01-06"),
         Seq(3, "a1", 10.0, 1000, "2021-01-04")
       )
 
-      // test insert overwrite non-partitioned table
+      // Test insert overwrite non-partitioned table
       spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10, 1000")
       checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
         Seq(2, "a2", 10.0, 1000)
       )
-    })
-  }
 
-  test("Test Insert Overwrite Table for V2 Table") {
-    withSQLConf("hoodie.schema.on.read.enable" -> "true") {
-      withRecordType()(withTempDir { tmp =>
-        if (HoodieSparkUtils.gteqSpark3_2) {
-          val tableName = generateTableName
-          // Create a partitioned table
-          spark.sql(
-            s"""
-               |create table $tableName (
-               |  id int,
-               |  name string,
-               |  price double,
-               |  ts long,
-               |  dt string
-               |) using hudi
-               | tblproperties (primaryKey = 'id', preCombineField='dt')
-               | partitioned by (dt)
-               | location '${tmp.getCanonicalPath}/$tableName'
-        """.stripMargin)
-
-          //  Test insert overwrite table
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName
-               | values(1, 'a1', 10.0, 1000, '2021-01-05')
-         """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-            Seq(1, "a1", 10.0, 1000, "2021-01-05")
-          )
-
-          //  Insert overwrite table
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName
-               | values (2, 'a2', 10.0, 1000, '2021-01-06')
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
-            Seq(2, "a2", 10.0, 1000, "2021-01-06")
-          )
-
-          // Insert overwrite static partition
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName partition(dt = '2021-01-05')
-               | select * from (select 2 , 'a2', 12.0, 1000) limit 10
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
-            Seq(2, "a2", 12.0, 1000, "2021-01-05"),
-            Seq(2, "a2", 10.0, 1000, "2021-01-06")
-          )
-
-          // Insert data from another table
-          val tblNonPartition = generateTableName
-          spark.sql(
-            s"""
-               | create table $tblNonPartition (
-               |  id int,
-               |  name string,
-               |  price double,
-               |  ts long
-               | ) using hudi
-               | tblproperties (primaryKey = 'id')
-               | location '${tmp.getCanonicalPath}/$tblNonPartition'
-                  """.stripMargin)
-          spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName partition(dt ='2021-01-04')
-               | select * from $tblNonPartition limit 10
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")(
-            Seq(1, "a1", 10.0, 1000, "2021-01-04"),
-            Seq(2, "a2", 12.0, 1000, "2021-01-05"),
-            Seq(2, "a2", 10.0, 1000, "2021-01-06")
-          )
-
-          // Insert overwrite partitioned table, all partitions will be truncated
-          spark.sql(
-            s"""
-               | insert overwrite table $tableName
-               | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10
-                 """.stripMargin)
-          checkAnswer(s"select id, name, price, ts, dt from $tableName " +
-            s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
-            Seq(3, "a1", 10.0, 1000, "2021-01-04")
-          )
-
-          // Test insert overwrite non-partitioned table
-          spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000")
-          checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
-            Seq(2, "a2", 10.0, 1000)
-          )
-
-          spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000")
-          checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
-            Seq(2, "a2", 10.0, 2000)
-          )
-        }
-      })
-    }
+      spark.sql(s"insert overwrite table $tblNonPartition select 3, 'a3', 10, 1000")
+      checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+        Seq(3, "a3", 10.0, 1000)
+      )
+    })
   }
 
-
   test("Test Different Type of Partition Column") {
    withRecordType()(withTempDir { tmp =>
      val typeAndValue = Seq(
@@ -666,7 +566,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
              | tblproperties (primaryKey = 'id')
              | partitioned by (dt)
        """.stripMargin)
-        checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")(
+        checkException(s"insert overwrite table $tableName3 partition(dt = '2021-07-18') values(1, 'a1', 10, '2021-07-18')")(
           "Insert Overwrite Partition can not use bulk insert."
         )
       }


[hudi] 04/08: [HUDI-5646] Guard dropping columns by a config, do not allow by default (#7787)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b3fa99685ed923045bf966b0fc5b273a881e157a
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Wed Feb 1 18:13:25 2023 +0530

    [HUDI-5646] Guard dropping columns by a config, do not allow by default (#7787)
    
    * [HUDI-5646] Guard dropping columns by a config, do not allow by default
    
    * Replaced superfluous `isSchemaCompatible` override by explicitly specifying whether column drop should be allowed;
    
    * Revisited `HoodieSparkSqlWriter` to avoid (unnecessary) schema handling for delete operations
    
    * Remove meta-fields from latest table schema during analysis
    
    * Disable schema validation when partition columns are dropped
    
    ---------
    
    Co-authored-by: Alexey Kudinkin <al...@infinilake.com>
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  18 +++
 .../java/org/apache/hudi/table/HoodieTable.java    |  12 +-
 .../hudi/client/TestTableSchemaEvolution.java      | 125 +++++++++++++--------
 .../hudi/testutils/HoodieClientTestUtils.java      |  19 +++-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  23 +++-
 .../hudi/common/table/TableSchemaResolver.java     |  36 ------
 .../org/apache/hudi/common/util/ParquetUtils.java  |  34 +++---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  68 +++++------
 .../AlterHoodieTableChangeColumnCommand.scala      |   2 +-
 .../hudi/TestAvroSchemaResolutionSupport.scala     |  19 +++-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  49 +++++---
 .../hudi/functional/TestBasicSchemaEvolution.scala |  21 +++-
 .../hudi/functional/TestCOWDataSourceStorage.scala |  10 +-
 .../hudi/functional/TestColumnStatsIndex.scala     |   7 +-
 14 files changed, 268 insertions(+), 175 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e187fff4483..e6525a2b1dc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -227,6 +227,15 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue("true")
       .withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility.");
 
+  public static final ConfigProperty<String> SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP = ConfigProperty
+      .key("hoodie.datasource.write.schema.allow.auto.evolution.column.drop")
+      .defaultValue("false")
+      .sinceVersion("0.13.0")
+      .withDocumentation("Controls whether table's schema is allowed to automatically evolve when "
+          + "incoming batch's schema can have any of the columns dropped. By default, Hudi will not "
+          + "allow this kind of (auto) schema evolution. Set this config to true to allow table's "
+          + "schema to be updated automatically when columns are dropped from the new incoming batch.");
+
   public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE = ConfigProperty
       .key("hoodie.insert.shuffle.parallelism")
       .defaultValue("0")
@@ -1086,6 +1095,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(AVRO_SCHEMA_VALIDATE_ENABLE);
   }
 
+  public boolean shouldAllowAutoEvolutionColumnDrop() {
+    return getBooleanOrDefault(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP);
+  }
+
   public String getTableName() {
     return getString(TBL_NAME);
   }
@@ -2451,6 +2464,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAllowAutoEvolutionColumnDrop(boolean shouldAllowDroppedColumns) {
+      writeConfig.setValue(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, String.valueOf(shouldAllowDroppedColumns));
+      return this;
+    }
+
     public Builder forTable(String tableName) {
       writeConfig.setValue(TBL_NAME, tableName);
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index af8d8d23261..591ebc430dc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -800,7 +800,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
    */
   private void validateSchema() throws HoodieUpsertException, HoodieInsertException {
 
-    if (!config.shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+    if (!shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
       // Check not required
       return;
     }
@@ -812,7 +812,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
       TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
       writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
       tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields());
-      isValid = isSchemaCompatible(tableSchema, writerSchema);
+      isValid = isSchemaCompatible(tableSchema, writerSchema, config.shouldAllowAutoEvolutionColumnDrop());
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
     }
@@ -1010,4 +1010,12 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
   public Runnable getPreExecuteRunnable() {
     return Functions.noop();
   }
+
+  private boolean shouldValidateAvroSchema() {
+    // TODO(HUDI-4772) re-enable validations in case partition columns
+    //                 being dropped from the data-file after fixing the write schema
+    Boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns();
+
+    return config.shouldValidateAvroSchema() && !shouldDropPartitionColumns;
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index f778c7cceac..686563ebfdd 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.client;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -34,10 +32,17 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.List;
@@ -79,36 +84,35 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
 
   @Test
   public void testSchemaCompatibilityBasic() {
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA, false),
         "Same schema is compatible");
 
     String reorderedSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA
         + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema, false),
         "Reordered fields are compatible");
-    assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA),
+    assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA, false),
         "Reordered fields are compatible");
 
     String renamedSchema = TRIP_EXAMPLE_SCHEMA.replace("tip_history", "tip_future");
 
-    // NOTE: That even though renames could be carried over as "column drop" and "column add"
-    //       both of which are legitimate operations, no data carry-over will occur (exactly b/c
-    //       it's an old column being dropped, and the new one being added)
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, false),
+        "Renaming fields is essentially: dropping old field, created a new one");
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, true),
         "Renaming fields is essentially: dropping old field, created a new one");
 
     String renamedRecordSchema = TRIP_EXAMPLE_SCHEMA.replace("triprec", "triprec_renamed");
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema, false),
         "Renamed record name is not compatible");
 
     String swappedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA.replace("city_to_state", "fare")
         + FARE_NESTED_SCHEMA.replace("fare", "city_to_state") + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema, false),
         "Swapped fields are not compatible");
 
     String typeChangeSchemaDisallowed = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
         + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed, false),
         "Incompatible field type change is not allowed");
 
     // Array of allowed schema field type transitions
@@ -119,10 +123,10 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
     for (String[] fieldChange : allowedFieldChanges) {
       String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[0]) + TRIP_SCHEMA_SUFFIX;
       String toSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[1]) + TRIP_SCHEMA_SUFFIX;
-      assertTrue(isSchemaCompatible(fromSchema, toSchema),
+      assertTrue(isSchemaCompatible(fromSchema, toSchema, false),
           "Compatible field type change is not allowed");
       if (!fieldChange[0].equals("byte") && fieldChange[1].equals("byte")) {
-        assertFalse(isSchemaCompatible(toSchema, fromSchema),
+        assertFalse(isSchemaCompatible(toSchema, fromSchema, false),
             "Incompatible field type change is allowed");
       }
     }
@@ -130,32 +134,31 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
     // Names and aliases should match
     String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
     String toSchema = TRIP_SCHEMA_PREFIX.replace("triprec", "new_triprec") + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
-    assertFalse(isSchemaCompatible(fromSchema, toSchema), "Field names should match");
-    assertFalse(isSchemaCompatible(toSchema, fromSchema), "Field names should match");
+    assertFalse(isSchemaCompatible(fromSchema, toSchema, false), "Field names should match");
+    assertFalse(isSchemaCompatible(toSchema, fromSchema, false), "Field names should match");
 
 
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, false),
         "Added field with default is compatible (Evolved Schema)");
 
     String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
         + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
         + TRIP_SCHEMA_SUFFIX;
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema, false),
         "Multiple added fields with defaults are compatible");
 
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
-        TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
-            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX, false),
         "Added field without default and not nullable is not compatible (Evolved Schema)");
 
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
-        TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
-            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA, false),
         "Added nullable field is compatible (Evolved Schema)");
   }
 
-  @Test
-  public void testMORTable() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testMORTable(boolean shouldAllowDroppedColumns) throws Exception {
     tableType = HoodieTableType.MERGE_ON_READ;
 
     // Create the table
@@ -165,7 +168,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
       .setTimelineLayoutVersion(VERSION_1)
       .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
-    HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
+    HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA, shouldAllowDroppedColumns);
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
     // Initial inserts with TRIP_EXAMPLE_SCHEMA
@@ -194,20 +197,26 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
     checkReadRecords("000", numRecords);
 
     // Insert with evolved schema (column dropped) is allowed
-    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
     // We cannot use insertBatch directly here because we want to insert records
     // with a evolved schema and insertBatch inserts records using the TRIP_EXAMPLE_SCHEMA.
-    writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false);
+    try {
+      writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieInsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+      return;
+    }
 
-    // Update with evolved schema (column dropped) is allowed
+    // Update with evolved schema (column dropped) might be allowed depending on config set.
     updateBatch(hoodieDevolvedWriteConfig, client, "006", "005", Option.empty(),
                 initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 2 * numRecords, 0);
 
     // Insert with an evolved scheme is allowed
-    HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
+    HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
 
     // We cannot use insertBatch directly here because we want to insert records
@@ -230,19 +239,28 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9);
+    try {
+      updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieUpsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+    }
   }
 
-  @Test
-  public void testCopyOnWriteTable() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws Exception {
     // Create the table
     HoodieTableMetaClient.withPropertyBuilder()
       .fromMetaClient(metaClient)
       .setTimelineLayoutVersion(VERSION_1)
       .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
-    HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build();
+    HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withRollbackUsingMarkers(false)
+        .withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns)
+        .build();
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
     // Initial inserts with TRIP_EXAMPLE_SCHEMA
@@ -266,11 +284,17 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
     checkReadRecords("000", numRecords);
 
     // Inserting records w/ new evolved schema (w/ tip column dropped)
-    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
-    writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false);
+    try {
+      writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieInsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+      return;
+    }
 
     // Updating records w/ new evolved schema
     updateBatch(hoodieDevolvedWriteConfig, client, "005", "004", Option.empty(),
@@ -278,7 +302,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
                 numUpdateRecords, 2 * numRecords, 5);
 
     // Inserting with evolved schema is allowed
-    HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
+    HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
     final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("006", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
     // We cannot use insertBatch directly here because we want to insert records
@@ -299,9 +323,14 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
-                numUpdateRecords, 3 * numRecords, 8);
+    try {
+      updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
+          numUpdateRecords, 3 * numRecords, 8);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieUpsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+    }
   }
 
   private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException {
@@ -362,8 +391,8 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
     }).collect(Collectors.toList());
   }
 
-  private HoodieWriteConfig getWriteConfig(String schema) {
-    return getWriteConfigBuilder(schema).build();
+  private HoodieWriteConfig getWriteConfig(String schema, boolean shouldAllowDroppedColumns) {
+    return getWriteConfigBuilder(schema).withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns).build();
   }
 
   private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
@@ -373,8 +402,8 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
         .withAvroSchemaValidate(true);
   }
 
-  private static boolean isSchemaCompatible(String oldSchema, String newSchema) {
-    return AvroSchemaUtils.isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
+  private static boolean isSchemaCompatible(String oldSchema, String newSchema, boolean shouldAllowDroppedColumns) {
+    return AvroSchemaUtils.isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema), shouldAllowDroppedColumns);
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 609fdb0bd5c..09447965b2c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.HoodieHFileUtils;
@@ -96,11 +97,17 @@ public class HoodieClientTestUtils {
         .setMaster("local[4]")
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
-        .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
         .set("spark.sql.shuffle.partitions", "4")
         .set("spark.default.parallelism", "4");
 
-    if (HoodieSparkUtils.gteqSpark3_2()) {
+    // NOTE: This utility is used in modules where this class might not be present, therefore
+    //       to avoid littering output w/ [[ClassNotFoundException]]s we will skip adding it
+    //       in case this utility is used in the module not providing it
+    if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) {
+      sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+    }
+
+    if (canLoadClass("org.apache.spark.sql.hudi.catalog.HoodieCatalog") && HoodieSparkUtils.gteqSpark3_2()) {
       sparkConf.set("spark.sql.catalog.spark_catalog",
           "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
     }
@@ -326,4 +333,12 @@ public class HoodieClientTestUtils {
       throw new HoodieException("Failed to read schema from commit metadata", e);
     }
   }
+
+  private static boolean canLoadClass(String className) {
+    try {
+      return ReflectionUtils.getClass(className) != null;
+    } catch (Exception e) {
+      return false;
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 395fc100bf1..545acdcf309 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.avro;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
 
 import java.util.List;
 import java.util.Objects;
@@ -36,10 +37,10 @@ public class AvroSchemaUtils {
   private AvroSchemaUtils() {}
 
   /**
-   * See {@link #isSchemaCompatible(Schema, Schema, boolean)} doc for more details
+   * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for more details
    */
-  public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema) {
-    return isSchemaCompatible(prevSchema, newSchema, true);
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean allowProjection) {
+    return isSchemaCompatible(prevSchema, newSchema, true, allowProjection);
   }
 
   /**
@@ -50,10 +51,22 @@ public class AvroSchemaUtils {
    * @param newSchema new instance of the schema
    * @param checkNaming controls whether schemas fully-qualified names should be checked
    */
-  public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean checkNaming) {
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean checkNaming, boolean allowProjection) {
     // NOTE: We're establishing compatibility of the {@code prevSchema} and {@code newSchema}
     //       as following: {@code newSchema} is considered compatible to {@code prevSchema},
     //       iff data written using {@code prevSchema} could be read by {@code newSchema}
+
+    // In case schema projection is not allowed, new schema has to have all the same fields as the
+    // old schema
+    if (!allowProjection) {
+      // Check that each field in the oldSchema can be populated in the newSchema
+      if (prevSchema.getFields().stream()
+          .map(oldSchemaField -> SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
+          .anyMatch(Objects::isNull)) {
+        return false;
+      }
+    }
+
     AvroSchemaCompatibility.SchemaPairCompatibility result =
         AvroSchemaCompatibility.checkReaderWriterCompatibility(newSchema, prevSchema, checkNaming);
     return result.getType() == AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
@@ -88,7 +101,7 @@ public class AvroSchemaUtils {
   private static boolean isAtomicSchemasCompatible(Schema oneAtomicType, Schema anotherAtomicType) {
     // NOTE: Checking for compatibility of atomic types, we should ignore their
     //       corresponding fully-qualified names (as irrelevant)
-    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false);
+    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false, true);
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 4eddd6df031..03ed542bd60 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -31,7 +30,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Functions.Function1;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -315,40 +313,6 @@ public class TableSchemaResolver {
     return Option.empty();
   }
 
-  /**
-   * Get latest schema either from incoming schema or table schema.
-   * @param writeSchema incoming batch's write schema.
-   * @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise.
-   * @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required.
-   * @return the latest schema.
-   *
-   * @deprecated will be removed (HUDI-4472)
-   */
-  @Deprecated
-  public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace,
-      Function1<Schema, Schema> converterFn) {
-    Schema latestSchema = writeSchema;
-    try {
-      if (metaClient.isTimelineNonEmpty()) {
-        Schema tableSchema = getTableAvroSchemaWithoutMetadataFields();
-        if (convertTableSchemaToAddNamespace && converterFn != null) {
-          tableSchema = converterFn.apply(tableSchema);
-        }
-        if (writeSchema.getFields().size() < tableSchema.getFields().size() && AvroSchemaUtils.isSchemaCompatible(writeSchema, tableSchema)) {
-          // if incoming schema is a subset (old schema) compared to table schema. For eg, one of the
-          // ingestion pipeline is still producing events in old schema
-          latestSchema = tableSchema;
-          LOG.debug("Using latest table schema to rewrite incoming records " + tableSchema.toString());
-        }
-      }
-    } catch (IllegalArgumentException | InvalidTableException e) {
-      LOG.warn("Could not find any commits, falling back to using incoming batch's write schema");
-    } catch (Exception e) {
-      LOG.warn("Unknown exception thrown " + e.getMessage() + ", Falling back to using incoming batch's write schema");
-    }
-    return latestSchema;
-  }
-
   private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException {
     LOG.info("Reading schema from " + parquetFilePath);
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index bc736090d5e..d0ef867a2d4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -37,6 +37,7 @@ import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -314,20 +315,25 @@ public class ParquetUtils extends BaseFileUtils {
           .flatMap(blockMetaData ->
               blockMetaData.getColumns().stream()
                 .filter(f -> cols.contains(f.getPath().toDotString()))
-                .map(columnChunkMetaData ->
-                    HoodieColumnRangeMetadata.<Comparable>create(
-                        parquetFilePath.getName(),
-                        columnChunkMetaData.getPath().toDotString(),
-                        convertToNativeJavaType(
-                            columnChunkMetaData.getPrimitiveType(),
-                            columnChunkMetaData.getStatistics().genericGetMin()),
-                        convertToNativeJavaType(
-                            columnChunkMetaData.getPrimitiveType(),
-                            columnChunkMetaData.getStatistics().genericGetMax()),
-                        columnChunkMetaData.getStatistics().getNumNulls(),
-                        columnChunkMetaData.getValueCount(),
-                        columnChunkMetaData.getTotalSize(),
-                        columnChunkMetaData.getTotalUncompressedSize()))
+                .map(columnChunkMetaData -> {
+                  Statistics stats = columnChunkMetaData.getStatistics();
+                  return HoodieColumnRangeMetadata.<Comparable>create(
+                      parquetFilePath.getName(),
+                      columnChunkMetaData.getPath().toDotString(),
+                      convertToNativeJavaType(
+                          columnChunkMetaData.getPrimitiveType(),
+                          stats.genericGetMin()),
+                      convertToNativeJavaType(
+                          columnChunkMetaData.getPrimitiveType(),
+                          stats.genericGetMax()),
+                      // NOTE: In case when column contains only nulls Parquet won't be creating
+                      //       stats for it instead returning stubbed (empty) object. In that case
+                      //       we have to equate number of nulls to the value count ourselves
+                      stats.isEmpty() ? columnChunkMetaData.getValueCount() : stats.getNumNulls(),
+                      columnChunkMetaData.getValueCount(),
+                      columnChunkMetaData.getTotalSize(),
+                      columnChunkMetaData.getTotalUncompressedSize());
+                })
           )
           .collect(groupingByCollector);
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 0092019dad5..764f9474ee0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -26,7 +26,8 @@ import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaCompatible}
-import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
 import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
@@ -230,31 +231,6 @@ object HoodieSparkSqlWriter {
         }
       }
 
-      // NOTE: Target writer's schema is deduced based on
-      //         - Source's schema
-      //         - Existing table's schema (including its Hudi's [[InternalSchema]] representation)
-      val writerSchema = deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, parameters)
-
-      validateSchemaForHoodieIsDeleted(writerSchema)
-
-      // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING THIS
-      //       We have to register w/ Kryo all of the Avro schemas that might potentially be used to decode
-      //       records into Avro format. Otherwise, Kryo wouldn't be able to apply an optimization allowing
-      //       it to avoid the need to ser/de the whole schema along _every_ Avro record
-      val targetAvroSchemas = sourceSchema +: writerSchema +: latestTableSchemaOpt.toSeq
-      registerAvroSchemasWithKryo(sparkContext, targetAvroSchemas: _*)
-
-      log.info(s"Registered Avro schemas: ${targetAvroSchemas.map(_.toString(true)).mkString("\n")}")
-
-      // Short-circuit if bulk_insert via row is enabled.
-      // scalastyle:off
-      if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) {
-        val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, hoodieConfig, df, tblName,
-          basePath, path, instantTime, writerSchema, tableConfig.isTablePartitioned)
-        return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
-      }
-      // scalastyle:on
-
       val (writeResult, writeClient: SparkRDDWriteClient[_]) =
         operation match {
           case WriteOperationType.DELETE =>
@@ -312,8 +288,24 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
             (writeStatuses, client)
+
+          // Here all other (than DELETE, DELETE_PARTITION) write operations are handled
           case _ =>
-            // Here all other (than DELETE, DELETE_PARTITION) write operations are handled
+            // NOTE: Target writer's schema is deduced based on
+            //         - Source's schema
+            //         - Existing table's schema (including its Hudi's [[InternalSchema]] representation)
+            val writerSchema = deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, parameters)
+
+            validateSchemaForHoodieIsDeleted(writerSchema)
+
+            // Short-circuit if bulk_insert via row is enabled.
+            // scalastyle:off
+            if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) {
+              val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, hoodieConfig, df, tblName,
+                basePath, path, instantTime, writerSchema, tableConfig.isTablePartitioned)
+              return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
+            }
+            // scalastyle:on
 
             // Check whether partition columns should be persisted w/in the data-files, or should
             // be instead omitted from them and simply encoded into the partition path (which is Spark's
@@ -404,7 +396,11 @@ object HoodieSparkSqlWriter {
       // writer's schema. No additional handling is required
       case None => sourceSchema
       // Otherwise, we need to make sure we reconcile incoming and latest table schemas
-      case Some(latestTableSchema) =>
+      case Some(latestTableSchemaWithMetaFields) =>
+        // NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of
+        //       deducing proper writer schema we're stripping them to make sure we can perform proper
+        //       analysis
+        val latestTableSchema = removeMetadataFields(latestTableSchemaWithMetaFields)
         // Before validating whether schemas are compatible, we need to "canonicalize" source's schema
         // relative to the table's one, by doing a (minor) reconciliation of the nullability constraints:
         // for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable
@@ -417,6 +413,9 @@ object HoodieSparkSqlWriter {
           sourceSchema
         }
 
+        val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
+          HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
+
         if (shouldReconcileSchema) {
           internalSchemaOpt match {
             case Some(internalSchema) =>
@@ -428,7 +427,8 @@ object HoodieSparkSqlWriter {
             case None =>
               // In case schema reconciliation is enabled we will employ (legacy) reconciliation
               // strategy to produce target writer's schema (see definition below)
-              val (reconciledSchema, isCompatible) = reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
+              val (reconciledSchema, isCompatible) =
+                reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
 
               // NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
               //       w/ the table's one and allow schemas to diverge. This is required in cases where
@@ -455,7 +455,7 @@ object HoodieSparkSqlWriter {
           //       w/ the table's one and allow schemas to diverge. This is required in cases where
           //       partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
           //       only incoming dataset's projection has to match the table's schema, and not the whole one
-          if (!shouldValidateSchemasCompatibility || AvroSchemaUtils.isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema)) {
+          if (!shouldValidateSchemasCompatibility || isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) {
             canonicalizedSourceSchema
           } else {
             log.error(
@@ -550,14 +550,18 @@ object HoodieSparkSqlWriter {
     // the other one (schema A contains schema B, if schema B is a projection of A). This enables us,
     // to always "extend" the schema during schema evolution and hence never lose the data (when, for ex
     // existing column is being dropped in a new batch)
+    //
+    // NOTE: By default Hudi doesn't allow automatic schema evolution to drop the columns from the target
+    //       table. However, when schema reconciliation is turned on, we would allow columns to be dropped
+    //       in the incoming batch (as these would be reconciled in anyway)
     if (isCompatibleProjectionOf(tableSchema, newSchema)) {
       // Picking table schema as a writer schema we need to validate that we'd be able to
       // rewrite incoming batch's data (written in new schema) into it
-      (tableSchema, isSchemaCompatible(newSchema, tableSchema))
+      (tableSchema, isSchemaCompatible(newSchema, tableSchema, true))
     } else {
       // Picking new schema as a writer schema we need to validate that we'd be able to
       // rewrite table's data into it
-      (newSchema, isSchemaCompatible(tableSchema, newSchema))
+      (newSchema, isSchemaCompatible(tableSchema, newSchema, true))
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
index dd2aae06bce..cb5a3f6fa75 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -97,7 +97,7 @@ case class AlterHoodieTableChangeColumnCommand(
   private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = {
     val schemaUtil = new TableSchemaResolver(metaClient)
     val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
-    if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema)) {
+    if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema, true)) {
       throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema +
         ", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath)
     }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index ad476fb38f3..cd6396f08fb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
 
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.SchemaCompatibilityException
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@@ -33,7 +34,7 @@ import scala.language.postfixOps
  * Test cases to validate Hudi's support for writing and reading when evolving schema implicitly via Avro's Schema Resolution
  * Note: Test will explicitly write into different partitions to ensure that a Hudi table will have multiple filegroups with different schemas.
  */
-class TestAvroSchemaResolutionSupport extends HoodieClientTestBase {
+class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAssertionSupport {
 
   var spark: SparkSession = _
   val commonOpts: Map[String, String] = Map(
@@ -82,12 +83,13 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase {
       .save(saveDir)
   }
 
-  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true): Unit = {
-    val opts = if (isCow) {
+  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true, shouldAllowDroppedColumns: Boolean = false): Unit = {
+    var opts = if (isCow) {
       commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE")
     } else {
       commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ")
     }
+    opts = opts ++ Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> shouldAllowDroppedColumns.toString)
 
     df.write.format("hudi")
       .options(opts)
@@ -228,7 +230,11 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase {
     upsertDf.show(false)
 
     // upsert
-    upsertData(upsertDf, tempRecordPath, isCow)
+    assertThrows(classOf[SchemaCompatibilityException]) {
+      upsertData(upsertDf, tempRecordPath, isCow)
+    }
+
+    upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns = true)
 
     // read out the table
     val readDf = spark.read.format("hudi").load(tempRecordPath)
@@ -776,7 +782,10 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase {
     df6 = df6.withColumn("userid", df6.col("userid").cast("float"))
     df6.printSchema()
     df6.show(false)
-    upsertData(df6, tempRecordPath)
+    assertThrows(classOf[SchemaCompatibilityException]) {
+      upsertData(df6, tempRecordPath)
+    }
+    upsertData(df6, tempRecordPath, shouldAllowDroppedColumns = true)
 
     // 7. Rearrange column position
     var df7 = Seq((7, "newcol1", 700, newPartition)).toDF("id", "newcol1", "userid", "name")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 657c7762c38..232883b71be 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, HoodieWriteConfig}
-import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException}
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
 import org.apache.hudi.functional.TestBootstrap
 import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
@@ -46,7 +46,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue,
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{spy, times, verify}
 import org.scalatest.Assertions.assertThrows
@@ -657,13 +657,21 @@ class TestHoodieSparkSqlWriter {
    * @param tableType Type of table
    */
   @ParameterizedTest
-  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
-  def testSchemaEvolutionForTableType(tableType: String): Unit = {
+  @CsvSource(value = Array(
+    "COPY_ON_WRITE,true",
+    "COPY_ON_WRITE,false",
+    "MERGE_ON_READ,true",
+    "MERGE_ON_READ,false"
+  ))
+  def testSchemaEvolutionForTableType(tableType: String, allowColumnDrop: Boolean): Unit = {
+    val opts = getCommonParams(tempPath, hoodieFooTableName, tableType) ++ Map(
+      HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> allowColumnDrop.toString
+    )
+
     // Create new table
     // NOTE: We disable Schema Reconciliation by default (such that Writer's
     //       schema is favored over existing Table's schema)
-    val noReconciliationOpts = getCommonParams(tempPath, hoodieFooTableName, tableType)
-      .updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "false")
+    val noReconciliationOpts = opts.updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "false")
 
     // Generate 1st batch
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -748,22 +756,29 @@ class TestHoodieSparkSqlWriter {
     recordsSeq = convertRowListToSeq(records)
 
     val df5 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5)
 
-    val snapshotDF5 = spark.read.format("org.apache.hudi")
-      .load(tempBasePath + "/*/*/*/*")
+    if (allowColumnDrop) {
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5)
 
-    assertEquals(35, snapshotDF5.count())
+      val snapshotDF5 = spark.read.format("org.apache.hudi")
+        .load(tempBasePath + "/*/*/*/*")
 
-    assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0)
+      assertEquals(35, snapshotDF5.count())
 
-    val fifthBatchActualSchema = fetchActualSchema()
-    val fifthBatchExpectedSchema = {
-      val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName)
-      AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, structName, nameSpace)
-    }
+      assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0)
+
+      val fifthBatchActualSchema = fetchActualSchema()
+      val fifthBatchExpectedSchema = {
+        val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName)
+        AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, structName, nameSpace)
+      }
 
-    assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema)
+      assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema)
+    } else {
+      assertThrows[SchemaCompatibilityException] {
+        HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5)
+      }
+    }
   }
 
   /**
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index bc0585a01e5..ccbd04a45b6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -107,11 +107,11 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS
         DataSourceWriteOptions.OPERATION.key -> opType
       )
 
-    def appendData(schema: StructType, batch: Seq[Row]): Unit = {
+    def appendData(schema: StructType, batch: Seq[Row], shouldAllowDroppedColumns: Boolean = false): Unit = {
       HoodieUnsafeUtils.createDataFrameFromRows(spark, batch, schema)
         .write
         .format("org.apache.hudi")
-        .options(opts)
+        .options(opts ++ Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> shouldAllowDroppedColumns.toString))
         .mode(SaveMode.Append)
         .save(basePath)
     }
@@ -202,7 +202,8 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS
     }
 
     //
-    // 3. Write 3d batch with another schema (w/ omitted a _nullable_ column `second_name`, expected to succeed)
+    // 3. Write 3d batch with another schema (w/ omitted a _nullable_ column `second_name`, expected to succeed if
+    // col drop is enabled)
     //
 
     val thirdSchema = StructType(
@@ -217,7 +218,14 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS
       Row("8", "Ron", "14", 1, 1),
       Row("9", "Germiona", "16", 1, 1))
 
-    appendData(thirdSchema, thirdBatch)
+    if (shouldReconcileSchema) {
+      appendData(thirdSchema, thirdBatch)
+    } else {
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(thirdSchema, thirdBatch)
+      }
+      appendData(thirdSchema, thirdBatch, shouldAllowDroppedColumns = true)
+    }
     val (tableSchemaAfterThirdBatch, rowsAfterThirdBatch) = loadTable()
 
     // NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the table's schema over the new batch
@@ -270,7 +278,10 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS
         appendData(fourthSchema, fourthBatch)
       }
     } else {
-      appendData(fourthSchema, fourthBatch)
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(fourthSchema, fourthBatch)
+      }
+      appendData(fourthSchema, fourthBatch, shouldAllowDroppedColumns = true)
       val (latestTableSchema, rows) = loadTable()
 
       assertEquals(fourthSchema, latestTableSchema)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 15b6751328c..cb807319d94 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -68,10 +68,12 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
     "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
   ), delimiter = '|')
   def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = {
-    var options: Map[String, String] = commonOpts +
-      (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) +
-      (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
-      (DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys)
+    var options: Map[String, String] = commonOpts ++ Map(
+      HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
+      HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")
+
     val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
     if (isTimestampBasedKeyGen) {
       options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 0d3b09b4c17..9e674db3541 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.{LocatedFileStatus, Path}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
 import org.apache.hudi.HoodieConversionUtils.toProperties
-import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ParquetUtils
@@ -31,7 +31,6 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
-
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.typedLit
 import org.apache.spark.sql.types._
@@ -42,7 +41,6 @@ import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, V
 
 import java.math.BigInteger
 import java.sql.{Date, Timestamp}
-
 import scala.collection.JavaConverters._
 import scala.util.Random
 
@@ -193,7 +191,8 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
       HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
-      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true"
     ) ++ metadataOpts
 
     val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString


[hudi] 07/08: [HUDI-5676] Fix BigQuerySyncTool standalone mode (#7816)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c487ee428d5a639655a5a48b45ce4507d45c34a3
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Thu Feb 2 00:39:28 2023 -0600

    [HUDI-5676] Fix BigQuerySyncTool standalone mode (#7816)
---
 .../hudi/gcp/bigquery/BigQuerySyncConfig.java      | 38 ++++--------
 .../gcp/bigquery/TestBigQuerySyncToolArgs.java     | 70 ++++++++++++++++++++++
 packaging/hudi-gcp-bundle/pom.xml                  |  8 ++-
 3 files changed, 90 insertions(+), 26 deletions(-)

diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index b46cd9a9f81..52b3d3b74e5 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -20,14 +20,13 @@
 package org.apache.hudi.gcp.bigquery;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParametersDelegate;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
 /**
@@ -101,38 +100,27 @@ public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable
     public String datasetName;
     @Parameter(names = {"--dataset-location"}, description = "Location of the target dataset in BigQuery", required = true)
     public String datasetLocation;
-    @Parameter(names = {"--table-name"}, description = "Name of the target table in BigQuery", required = true)
-    public String tableName;
     @Parameter(names = {"--source-uri"}, description = "Name of the source uri gcs path of the table", required = true)
     public String sourceUri;
     @Parameter(names = {"--source-uri-prefix"}, description = "Name of the source uri gcs path prefix of the table", required = true)
     public String sourceUriPrefix;
-    @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
-    public String basePath;
-    @Parameter(names = {"--partitioned-by"}, description = "Comma-delimited partition fields. Default to non-partitioned.")
-    public List<String> partitionFields = new ArrayList<>();
-    @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
-    public boolean useFileListingFromMetadata = false;
-    @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
-        + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
-    public boolean assumeDatePartitioning = false;
 
     public boolean isHelp() {
       return hoodieSyncConfigParams.isHelp();
     }
 
-    public Properties toProps() {
-      final Properties props = hoodieSyncConfigParams.toProps();
-      props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), projectId);
-      props.setProperty(BIGQUERY_SYNC_DATASET_NAME.key(), datasetName);
-      props.setProperty(BIGQUERY_SYNC_DATASET_LOCATION.key(), datasetLocation);
-      props.setProperty(BIGQUERY_SYNC_TABLE_NAME.key(), tableName);
-      props.setProperty(BIGQUERY_SYNC_SOURCE_URI.key(), sourceUri);
-      props.setProperty(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), sourceUriPrefix);
-      props.setProperty(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), basePath);
-      props.setProperty(BIGQUERY_SYNC_PARTITION_FIELDS.key(), String.join(",", partitionFields));
-      props.setProperty(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(useFileListingFromMetadata));
-      props.setProperty(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), String.valueOf(assumeDatePartitioning));
+    public TypedProperties toProps() {
+      final TypedProperties props = hoodieSyncConfigParams.toProps();
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_PROJECT_ID.key(), projectId);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_NAME.key(), datasetName);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_LOCATION.key(), datasetLocation);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_TABLE_NAME.key(), hoodieSyncConfigParams.tableName);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI.key(), sourceUri);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), sourceUriPrefix);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), hoodieSyncConfigParams.basePath);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), String.join(",", hoodieSyncConfigParams.partitionFields));
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), hoodieSyncConfigParams.useFileListingFromMetadata);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), hoodieSyncConfigParams.assumeDatePartitioning);
       return props;
     }
   }
diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
new file mode 100644
index 00000000000..898358484d9
--- /dev/null
+++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import com.beust.jcommander.JCommander;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestBigQuerySyncToolArgs {
+
+  @Test
+  public void testArgsParse() {
+    BigQuerySyncConfig.BigQuerySyncConfigParams params = new BigQuerySyncConfig.BigQuerySyncConfigParams();
+    JCommander cmd = JCommander.newBuilder().addObject(params).build();
+    String[] args = {
+        "--project-id", "hudi-bq",
+        "--dataset-name", "foobar",
+        "--dataset-location", "us-west1",
+        "--table", "foobartable",
+        "--source-uri", "gs://foobartable/year=*",
+        "--source-uri-prefix", "gs://foobartable/",
+        "--base-path", "gs://foobartable",
+        "--partitioned-by", "year,month,day",
+        "--use-file-listing-from-metadata"
+    };
+    cmd.parse(args);
+
+    final Properties props = params.toProps();
+    assertEquals("hudi-bq", props.getProperty(BIGQUERY_SYNC_PROJECT_ID.key()));
+    assertEquals("foobar", props.getProperty(BIGQUERY_SYNC_DATASET_NAME.key()));
+    assertEquals("us-west1", props.getProperty(BIGQUERY_SYNC_DATASET_LOCATION.key()));
+    assertEquals("foobartable", props.getProperty(BIGQUERY_SYNC_TABLE_NAME.key()));
+    assertEquals("gs://foobartable/year=*", props.getProperty(BIGQUERY_SYNC_SOURCE_URI.key()));
+    assertEquals("gs://foobartable/", props.getProperty(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key()));
+    assertEquals("gs://foobartable", props.getProperty(BIGQUERY_SYNC_SYNC_BASE_PATH.key()));
+    assertEquals("year,month,day", props.getProperty(BIGQUERY_SYNC_PARTITION_FIELDS.key()));
+    assertEquals("true", props.getProperty(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key()));
+    assertFalse(props.containsKey(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key()));
+  }
+}
diff --git a/packaging/hudi-gcp-bundle/pom.xml b/packaging/hudi-gcp-bundle/pom.xml
index 2149d1d35ae..4bf60363106 100644
--- a/packaging/hudi-gcp-bundle/pom.xml
+++ b/packaging/hudi-gcp-bundle/pom.xml
@@ -95,9 +95,9 @@
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
                   <include>org.apache.hudi:hudi-sync-common</include>
+                  <include>org.apache.hudi:hudi-hive-sync</include>
                   <include>org.apache.hudi:hudi-gcp</include>
                   <include>org.apache.parquet:parquet-avro</include>
-
                   <include>com.google.cloud:google-cloud-bigquery</include>
                   <include>com.beust:jcommander</include>
                   <include>commons-io:commons-io</include>
@@ -164,6 +164,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-hive-sync</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-gcp</artifactId>


[hudi] 01/08: [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 22b634b081571f01d2d9de4e94ce240cecc46d7d
Author: chao chen <59...@users.noreply.github.com>
AuthorDate: Wed Feb 1 09:45:39 2023 +0800

    [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)
    
    
    Co-authored-by: danny0405 <yu...@gmail.com>
---
 .../apache/hudi/table/catalog/HiveSchemaUtils.java |  8 ++++---
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  2 +-
 .../hudi/table/catalog/TableOptionProperties.java  | 25 ++++++++++++++++++++--
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 15 +++++++++++++
 4 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index 4383b42e9f8..fac507cb7db 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -181,9 +181,11 @@ public class HiveSchemaUtils {
    */
   public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, boolean withOperationField) {
     List<FieldSchema> columns = new ArrayList<>();
-    Collection<String> metaFields = withOperationField
-        ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence
-        : HoodieRecord.HOODIE_META_COLUMNS;
+    Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS);
+    if (withOperationField) {
+      metaFields.add(HoodieRecord.OPERATION_METADATA_FIELD);
+    }
+
     for (String metaField : metaFields) {
       columns.add(new FieldSchema(metaField, "string", null));
     }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 6dcdf118415..fd36a39d237 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -597,7 +597,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat));
     serdeProperties.put("serialization.format", "1");
 
-    serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys));
+    serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys, withOperationField));
 
     sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index a0864bbf377..6e327bdc612 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -28,6 +29,8 @@ import org.apache.hudi.util.AvroSchemaConverter;
 
 import org.apache.avro.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,7 +42,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -49,6 +54,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
 import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
 
 /**
@@ -168,8 +174,10 @@ public class TableOptionProperties {
       CatalogTable catalogTable,
       Configuration hadoopConf,
       Map<String, String> properties,
-      List<String> partitionKeys) {
-    Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
+      List<String> partitionKeys,
+      boolean withOperationField) {
+    RowType rowType = supplementMetaFields((RowType) catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), withOperationField);
+    Schema schema = AvroSchemaConverter.convertToSchema(rowType);
     MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
     String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
     Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
@@ -184,6 +192,19 @@ public class TableOptionProperties {
             e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
   }
 
+  private static RowType supplementMetaFields(RowType rowType, boolean withOperationField) {
+    Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS);
+    if (withOperationField) {
+      metaFields.add(OPERATION_METADATA_FIELD);
+    }
+    ArrayList<RowType.RowField> rowFields = new ArrayList<>();
+    for (String metaField : metaFields) {
+      rowFields.add(new RowType.RowField(metaField, new VarCharType(10000)));
+    }
+    rowFields.addAll(rowType.getFields());
+    return new RowType(false, rowFields);
+  }
+
   public static Map<String, String> translateSparkTableProperties2Flink(Map<String, String> options) {
     if (options.containsKey(CONNECTOR.key())) {
       return options;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 5d27cdadbbb..c697cb92509 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -139,6 +139,21 @@ public class TestHoodieHiveCatalog {
         .collect(Collectors.joining(","));
     assertEquals("par1:string", partitionSchema);
 
+    // validate spark schema properties
+    String avroSchemaStr = hiveTable.getParameters().get("spark.sql.sources.schema.part.0");
+    String expectedAvroSchemaStr = ""
+        + "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"uuid\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},"
+        + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"par1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}";
+    assertEquals(expectedAvroSchemaStr, avroSchemaStr);
+
     // validate catalog table
     CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
     assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));


[hudi] 08/08: [HUDI-5647] Automate savepoint and restore tests (#7796)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 40d534e878bb8ceb4bc7fa20539cca3debd5727e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Feb 2 14:49:53 2023 +0800

    [HUDI-5647] Automate savepoint and restore tests (#7796)
---
 .../TestSavepointRestoreCopyOnWrite.java           | 173 ++++++++++++++
 .../TestSavepointRestoreMergeOnRead.java           | 248 +++++++++++++++++++++
 .../hudi/testutils/HoodieClientTestBase.java       |  62 +++++-
 3 files changed, 482 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
new file mode 100644
index 00000000000..8a71a01fda9
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for COPY_ON_WRITE table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreCopyOnWrite extends HoodieClientTestBase {
+
+  /**
+   * Actions: C1, C2, savepoint C2, C3, C4, restore.
+   * Should go back to C2,
+   * C3 and C4 should be cleaned up.
+   */
+  @Test
+  void testBasicRollback() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 4; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2nd commit been rolled back
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty());
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(40);
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+
+  /**
+   * The restore should roll back all the pending instants that are beyond the savepoint.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 inflight, restore.
+   * Should go back to C2,
+   * C3, C4 should be cleaned up.
+   */
+  @Test
+  void testCleaningPendingInstants() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2nd commit been rolled back
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty());
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(30);
+      // write another pending instant
+      insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords);
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+
+  /**
+   * The rollbacks(either inflight or complete) beyond the savepoint should be cleaned.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3), C5, restore.
+   * Should go back to C2.
+   * C3, C4(RB_C3), C5 should be cleaned up.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3) inflight, restore.
+   * Should go back to C2.
+   * C3, C4 (RB_C3) should be cleaned up.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCleaningRollbackInstants(boolean commitRollback) throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
+        // eager cleaning
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 2; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2nd commit been rolled back
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty());
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(20);
+      // write another pending instant
+      insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords);
+      // rollback the pending instant
+      if (commitRollback) {
+        client.rollbackFailedWrites();
+      } else {
+        HoodieInstant pendingInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction()
+            .lastInstant().orElseThrow(() -> new HoodieException("Pending instant does not exist"));
+        HoodieSparkTable.create(client.getConfig(), context)
+            .scheduleRollback(context, HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true);
+      }
+      Option<String> rollbackInstant = metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::getTimestamp);
+      assertTrue(rollbackInstant.isPresent(), "The latest instant should be a rollback");
+      // write another batch
+      insertBatch(hoodieWriteConfig, client, HoodieActiveTimeline.createNewInstantTime(), rollbackInstant.get(), numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords * 3, 1, Option.empty());
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
new file mode 100644
index 00000000000..6c1dfe5d734
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for MERGE_ON_READ table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreMergeOnRead extends HoodieClientTestBase {
+
+  /**
+   * Actions: DC1, DC2, DC3, savepoint DC3,(snapshot query) DC4, C5, DC6, DC7. restore to DC3.
+   * Should roll back DC5 and DC6.
+   * The latest file slice should be fully cleaned up, and rollback log appends for DC4 in first file slice.
+   *
+   * <p>For example file layout,
+   * FG1:
+   *   BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4)
+   *   BF5(C5), LF1(DC6), LF2(DC7)
+   * After restore, it becomes
+   *   BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4)
+   *
+   * <p>Expected behaviors:
+   *   snapshot query: total rec matches.
+   *   checking the row count by updating columns in (val4,val5,val6, val7).
+   */
+  @Test
+  void testCleaningDeltaCommits() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction
+            .withInlineCompaction(true)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      final int numRecords = 10;
+      List<HoodieRecord> baseRecordsToUpdate = null;
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2td commit been rolled back
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.insert(writeRecords, newCommitTime);
+        if (i == 3) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          baseRecordsToUpdate = records;
+          client.savepoint("user1", "Savepoint for 3rd commit");
+        }
+      }
+
+      assertRowNumberEqualsTo(30);
+
+      // write another 3 delta commits
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.upsert(writeRecords, newCommitTime);
+        if (i == 1) {
+          Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+          assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+          client.compact(compactionInstant.get());
+        }
+      }
+
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(30);
+    }
+  }
+
+  /**
+   * <p>Actions: DC1, DC2, DC3, savepoint DC3, DC4, C5.pending, DC6, DC7, restore
+   * should roll back until DC3.
+   *
+   * <P>Expected behaviors: pending compaction after savepoint should also be cleaned,
+   * the latest file slice should be fully delete, for DC4 a rollback log append should be made.
+   */
+  @Test
+  void testCleaningPendingCompaction() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction
+            .withInlineCompaction(false)
+            .withScheduleInlineCompaction(true)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      final int numRecords = 10;
+      List<HoodieRecord> baseRecordsToUpdate = null;
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2td commit been rolled back
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.insert(writeRecords, newCommitTime);
+        if (i == 3) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          baseRecordsToUpdate = records;
+          client.savepoint("user1", "Savepoint for 3rd commit");
+        }
+      }
+
+      assertRowNumberEqualsTo(30);
+
+      // write another 3 delta commits
+      for (int i = 1; i <= 3; i++) {
+        upsertBatch(writeClient, baseRecordsToUpdate);
+        if (i == 1) {
+          Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+          assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+          compactWithoutCommit(compactionInstant.get());
+        }
+      }
+
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(30);
+    }
+  }
+
+  /**
+   * Actions: DC1, DC2, DC3, C4, savepoint C4, DC5, C6(RB_DC5), DC7, restore
+   *
+   * <P>Expected behaviors: should roll back DC5, C6 and DC6.
+   * No files will be cleaned up. Only rollback log appends.
+   */
+  @Test
+  void testCleaningCompletedRollback() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit triggers compaction
+            .withInlineCompaction(false)
+            .withScheduleInlineCompaction(true)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      final int numRecords = 10;
+      List<HoodieRecord> baseRecordsToUpdate = null;
+      for (int i = 1; i <= 2; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2td commit been rolled back
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.insert(writeRecords, newCommitTime);
+        if (i == 2) {
+          baseRecordsToUpdate = records;
+        }
+      }
+
+      // update to generate log files, then a valid compaction plan can be scheduled
+      upsertBatch(client, baseRecordsToUpdate);
+      Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+      client.compact(compactionInstant.get());
+      savepointCommit = compactionInstant.get();
+      client.savepoint("user1", "Savepoint for 3td commit");
+
+      assertRowNumberEqualsTo(20);
+      // write a delta_commit but does not commit
+      updateBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+      // rollback the delta_commit
+      assertTrue(writeClient.rollbackFailedWrites(), "The last delta_commit should be rolled back");
+
+      // another update
+      upsertBatch(writeClient, baseRecordsToUpdate);
+
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+
+  private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    client.upsert(writeRecords, newCommitTime);
+  }
+
+  private void compactWithoutCommit(String compactionInstantTime) {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withAutoCommit(false) // disable auto commit
+        .withRollbackUsingMarkers(true)
+        .build();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      JavaRDD<WriteStatus> statuses = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime).getWriteStatuses();
+      assertNoWriteErrors(statuses.collect());
+    }
+  }
+
+  @Override
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 2b4172781c6..43f843a4f33 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -597,7 +597,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
 
     // verify that there is a commit
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+    HoodieTimeline timeline = metaClient.getCommitsTimeline();
 
     if (assertForCommit) {
       assertEquals(expTotalCommits, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(),
@@ -700,6 +700,66 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
     return result;
   }
 
+  /**
+   * Insert a batch of records without commit(so that the instant is in-flight).
+   *
+   * @param newCommitTime The commit time
+   * @param numRecords    The number of records to insert
+   */
+  @SuppressWarnings("rawtypes, unchecked")
+  protected void insertBatchWithoutCommit(String newCommitTime, int numRecords) {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withAutoCommit(false) // disable auto commit
+        .withRollbackUsingMarkers(true)
+        .build();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+      List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+    }
+  }
+
+  /**
+   * Update a batch of records without commit(so that the instant is in-flight).
+   *
+   * @param newCommitTime The commit time
+   * @param baseRecordsToUpdate The base records to update
+   */
+  @SuppressWarnings("rawtypes, unchecked")
+  protected void updateBatchWithoutCommit(String newCommitTime, List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withAutoCommit(false) // disable auto commit
+        .withRollbackUsingMarkers(true)
+        .build();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, baseRecordsToUpdate);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      client.upsert(writeRecords, newCommitTime);
+    }
+  }
+
+  /**
+   * Asserts the row number in data generator equals to {@code numRows}.
+   *
+   * @param numRows The expected row number
+   */
+  protected void assertRowNumberEqualsTo(int numRows) {
+    // Check the entire dataset has all records still
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+    assertEquals(numRows, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
+        "Must contain " + numRows + " records");
+  }
+
   /**
    * Get Cleaner state corresponding to a partition path.
    *


[hudi] 06/08: [HUDI-5681] Fixing Kryo being instantiated w/ invalid `SparkConf` (#7821)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4a61e82576dc3044046fc3668f0f30eed63d4aa7
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Wed Feb 1 21:37:34 2023 -0800

    [HUDI-5681] Fixing Kryo being instantiated w/ invalid `SparkConf` (#7821)
    
    This is addressing misconfiguration of the Kryo object used specifically to serialize Spark's internal structures (like `Expression`s): previously we're using default `SparkConf` instance to configure it, while instead we should have used the one provided by `SparkEnv`
---
 .../org/apache/spark/sql/hudi/SerDeUtils.scala     | 44 ------------------
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  8 ++--
 .../hudi/command/payload/ExpressionPayload.scala   | 54 ++++++++++++++++++++--
 3 files changed, 54 insertions(+), 52 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala
deleted file mode 100644
index 631644121c1..00000000000
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hudi
-
-import org.apache.hudi.common.util.BinaryUtil
-import org.apache.spark.SparkConf
-import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
-
-import java.nio.ByteBuffer
-
-
-object SerDeUtils {
-
-  private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] {
-
-    override protected def initialValue: SerializerInstance = {
-      new KryoSerializer(new SparkConf(true)).newInstance()
-    }
-  }
-
-  def toBytes(o: Any): Array[Byte] = {
-    val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o)
-    BinaryUtil.toBytes(buf)
-  }
-
-  def toObject(bytes: Array[Byte]): Any = {
-    SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes))
-  }
-}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 418b2f8d6ec..93972b392b2 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
 import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
-import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.types.{BooleanType, StructType}
 
 import java.util.Base64
@@ -328,7 +328,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
       }).toMap
     // Serialize the Map[UpdateCondition, UpdateAssignments] to base64 string
     val serializedUpdateConditionAndExpressions = Base64.getEncoder
-      .encodeToString(SerDeUtils.toBytes(updateConditionToAssignments))
+      .encodeToString(Serializer.toBytes(updateConditionToAssignments))
     writeParams += (PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS ->
       serializedUpdateConditionAndExpressions)
 
@@ -338,7 +338,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         .getOrElse(Literal.create(true, BooleanType))
       // Serialize the Map[DeleteCondition, empty] to base64 string
       val serializedDeleteCondition = Base64.getEncoder
-        .encodeToString(SerDeUtils.toBytes(Map(deleteCondition -> Seq.empty[Assignment])))
+        .encodeToString(Serializer.toBytes(Map(deleteCondition -> Seq.empty[Assignment])))
       writeParams += (PAYLOAD_DELETE_CONDITION -> serializedDeleteCondition)
     }
 
@@ -414,7 +414,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         rewriteCondition -> formatAssignments
       }).toMap
     Base64.getEncoder.encodeToString(
-      SerDeUtils.toBytes(insertConditionAndAssignments))
+      Serializer.toBytes(insertConditionAndAssignments))
   }
 
   /**
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index d874f7bec3a..015ba2e2e8e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -29,18 +29,19 @@ import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
 import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord}
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
+import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => HOption}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.io.HoodieWriteHandle
 import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, Projection, SafeProjection}
-import org.apache.spark.sql.hudi.SerDeUtils
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.{SparkConf, SparkEnv}
 
+import java.nio.ByteBuffer
 import java.util.function.{Function, Supplier}
 import java.util.{Base64, Properties}
 import scala.collection.JavaConverters._
@@ -420,7 +421,7 @@ object ExpressionPayload {
           override def apply(key: (String, Schema)): Seq[(Projection, Projection)] = {
             val (encodedConditionalAssignments, _) = key
             val serializedBytes = Base64.getDecoder.decode(encodedConditionalAssignments)
-            val conditionAssignments = SerDeUtils.toObject(serializedBytes)
+            val conditionAssignments = Serializer.toObject(serializedBytes)
               .asInstanceOf[Map[Expression, Seq[Expression]]]
             conditionAssignments.toSeq.map {
               case (condition, assignments) =>
@@ -455,5 +456,50 @@ object ExpressionPayload {
             field.schema, field.doc, field.defaultVal, field.order))
     Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava)
   }
+
+
+  /**
+   * This object differs from Hudi's generic [[SerializationUtils]] in its ability to serialize
+   * Spark's internal structures (various [[Expression]]s)
+   *
+   * For that purpose we re-use Spark's [[KryoSerializer]] instance sharing configuration
+   * with enclosing [[SparkEnv]]. This is necessary to make sure that this particular instance of Kryo
+   * user for serialization of Spark's internal structures (like [[Expression]]s) is configured
+   * appropriately (class-loading, custom serializers, etc)
+   *
+   * TODO rebase on Spark's SerializerSupport
+   */
+  private[hudi] object Serializer {
+
+    // NOTE: This is only Spark >= 3.0
+    private val KRYO_USE_POOL_CONFIG_KEY = "spark.kryo.pool"
+
+    private lazy val conf = {
+      val conf = Option(SparkEnv.get)
+        // To make sure we're not modifying existing environment's [[SparkConf]]
+        // we're cloning it here
+        .map(_.conf.clone)
+        .getOrElse(new SparkConf)
+      // This serializer is configured as thread-local, hence there's no need for
+      // pooling
+      conf.set(KRYO_USE_POOL_CONFIG_KEY, "false")
+      conf
+    }
+
+    private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] {
+      override protected def initialValue: SerializerInstance = {
+        new KryoSerializer(conf).newInstance()
+      }
+    }
+
+    def toBytes(o: Any): Array[Byte] = {
+      val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o)
+      BinaryUtil.toBytes(buf)
+    }
+
+    def toObject(bytes: Array[Byte]): Any = {
+      SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes))
+    }
+  }
 }
 


[hudi] 02/08: [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1ccf37f2287484c83f078a6d2dbb7e61f263640a
Author: StreamingFlames <18...@163.com>
AuthorDate: Wed Feb 1 13:53:19 2023 +0800

    [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)
---
 .../command/procedures/DeleteMarkerProcedure.scala |   8 +-
 .../RollbackToInstantTimeProcedure.scala           |  50 ++++----
 .../command/procedures/RunCleanProcedure.scala     |  27 +++--
 .../procedures/RunCompactionProcedure.scala        | 126 +++++++++++----------
 4 files changed, 121 insertions(+), 90 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
index bfbab32599b..d99a5489799 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.table.HoodieSparkTable
 import org.apache.hudi.table.marker.WriteMarkersFactory
 import org.apache.spark.internal.Logging
@@ -47,8 +48,9 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
     val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
     val basePath = getBasePath(tableName)
 
+    var client: SparkRDDWriteClient[_] = null
     val result = Try {
-      val client = createHoodieClient(jsc, basePath)
+      client = createHoodieClient(jsc, basePath)
       val config = client.getConfig
       val context = client.getEngineContext
       val table = HoodieSparkTable.create(config, context)
@@ -63,6 +65,10 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
         false
     }
 
+    if (client != null) {
+      client.close()
+    }
+
     Seq(Row(result))
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
index 1fcc665d611..c8109bd56e2 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
@@ -52,28 +53,35 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder
 
     val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
     val basePath = hoodieCatalogTable.tableLocation
-    val client = createHoodieClient(jsc, basePath)
-    client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
-    val config = getWriteConfig(basePath)
-    val metaClient = HoodieTableMetaClient.builder
-      .setConf(jsc.hadoopConfiguration)
-      .setBasePath(config.getBasePath)
-      .setLoadActiveTimelineOnLoad(false)
-      .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
-      .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
-      .build
-
-    val activeTimeline = metaClient.getActiveTimeline
-    val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
-    val filteredTimeline = completedTimeline.containsInstant(instantTime)
-    if (!filteredTimeline) {
-      throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = createHoodieClient(jsc, basePath)
+      client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
+      val config = getWriteConfig(basePath)
+      val metaClient = HoodieTableMetaClient.builder
+        .setConf(jsc.hadoopConfiguration)
+        .setBasePath(config.getBasePath)
+        .setLoadActiveTimelineOnLoad(false)
+        .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
+        .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
+        .build
+
+      val activeTimeline = metaClient.getActiveTimeline
+      val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
+      val filteredTimeline = completedTimeline.containsInstant(instantTime)
+      if (!filteredTimeline) {
+        throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
+      }
+
+      val result = if (client.rollback(instantTime)) true else false
+      val outputRow = Row(result)
+
+      Seq(outputRow)
+    } finally {
+      if (client != null) {
+        client.close()
+      }
     }
-
-    val result = if (client.rollback(instantTime)) true else false
-    val outputRow = Row(result)
-
-    Seq(outputRow)
   }
 
   override def build: Procedure = new RollbackToInstantTimeProcedure()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 36580176d0f..ca8b3fc95bc 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.util.JsonUtils
 import org.apache.hudi.config.HoodieCleanConfig
@@ -79,16 +80,24 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
       HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
       HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString
     )
-    val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
-    val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)
 
-    if (hoodieCleanMeta == null) Seq.empty
-    else Seq(Row(hoodieCleanMeta.getStartCleanTime,
-      hoodieCleanMeta.getTimeTakenInMillis,
-      hoodieCleanMeta.getTotalFilesDeleted,
-      hoodieCleanMeta.getEarliestCommitToRetain,
-      JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
-      hoodieCleanMeta.getVersion))
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
+      val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)
+
+      if (hoodieCleanMeta == null) Seq.empty
+      else Seq(Row(hoodieCleanMeta.getStartCleanTime,
+        hoodieCleanMeta.getTimeTakenInMillis,
+        hoodieCleanMeta.getTotalFilesDeleted,
+        hoodieCleanMeta.getEarliestCommitToRetain,
+        JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
+        hoodieCleanMeta.getVersion))
+    } finally {
+      if (client != null) {
+        client.close()
+      }
+    }
   }
 }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 3c51d7d8b29..d79cf8c302f 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.model.HoodieCommitMetadata
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
@@ -64,70 +65,77 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
 
     val basePath = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
-    val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
-
-    var willCompactionInstants: Seq[String] = Seq.empty
-    operation match {
-      case "schedule" =>
-        val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
-        if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
-          willCompactionInstants = Seq(instantTime)
-        }
-      case "run" =>
-        // Do compaction
-        val timeLine = metaClient.getActiveTimeline
-        val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
-          .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
-          .map(_.getTimestamp)
-          .toSeq.sortBy(f => f)
-        willCompactionInstants = if (instantTimestamp.isEmpty) {
-          if (pendingCompactionInstants.nonEmpty) {
-            pendingCompactionInstants
-          } else { // If there are no pending compaction, schedule to generate one.
-            // CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
-            val instantTime = HoodieActiveTimeline.createNewInstantTime()
-            if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
-              Seq(instantTime)
+
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
+      var willCompactionInstants: Seq[String] = Seq.empty
+      operation match {
+        case "schedule" =>
+          val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
+          if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
+            willCompactionInstants = Seq(instantTime)
+          }
+        case "run" =>
+          // Do compaction
+          val timeLine = metaClient.getActiveTimeline
+          val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
+            .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+            .map(_.getTimestamp)
+            .toSeq.sortBy(f => f)
+          willCompactionInstants = if (instantTimestamp.isEmpty) {
+            if (pendingCompactionInstants.nonEmpty) {
+              pendingCompactionInstants
+            } else { // If there are no pending compaction, schedule to generate one.
+              // CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
+              val instantTime = HoodieActiveTimeline.createNewInstantTime()
+              if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
+                Seq(instantTime)
+              } else {
+                Seq.empty
+              }
+            }
+          } else {
+            // Check if the compaction timestamp has exists in the pending compaction
+            if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
+              Seq(instantTimestamp.get.toString)
             } else {
-              Seq.empty
+              throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
+                s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
             }
           }
-        } else {
-          // Check if the compaction timestamp has exists in the pending compaction
-          if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
-            Seq(instantTimestamp.get.toString)
+
+          if (willCompactionInstants.isEmpty) {
+            logInfo(s"No need to compaction on $basePath")
           } else {
-            throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
-              s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
-          }
-        }
-
-        if (willCompactionInstants.isEmpty) {
-          logInfo(s"No need to compaction on $basePath")
-        } else {
-          logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
-          val timer = HoodieTimer.start
-          willCompactionInstants.foreach { compactionInstant =>
-            val writeResponse = client.compact(compactionInstant)
-            handleResponse(writeResponse.getCommitMetadata.get())
-            client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
+            logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
+            val timer = HoodieTimer.start
+            willCompactionInstants.foreach { compactionInstant =>
+              val writeResponse = client.compact(compactionInstant)
+              handleResponse(writeResponse.getCommitMetadata.get())
+              client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
+            }
+            logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
+              s" spend: ${timer.endTimer()}ms")
           }
-          logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
-            s" spend: ${timer.endTimer()}ms")
-        }
-      case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
-    }
-
-    val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
-      .filter(instant => willCompactionInstants.contains(instant.getTimestamp))
-      .toSeq
-      .sortBy(p => p.getTimestamp)
-      .reverse
-
-    compactionInstants.map(instant =>
-      (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
-    ).map { case (instant, plan) =>
-      Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
+        case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
+      }
+
+      val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
+        .filter(instant => willCompactionInstants.contains(instant.getTimestamp))
+        .toSeq
+        .sortBy(p => p.getTimestamp)
+        .reverse
+
+      compactionInstants.map(instant =>
+        (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
+      ).map { case (instant, plan) =>
+        Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
+      }
+    } finally {
+      if (client != null) {
+        client.close()
+      }
     }
   }
 


[hudi] 05/08: [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6ef2135b2fa88d5f559d03b15660935cf7efd410
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Wed Feb 1 11:19:45 2023 -0800

    [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810)
    
    This is restoring existing behavior for DeltaStreamer Incremental Source, as the change in #7769 removed _hoodie_partition_path field from the dataset making it impossible to be accessed from the DS Transformers for ex
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 +-
 .../apache/hudi/common/config/HoodieConfig.java    |  8 --------
 .../org/apache/hudi/utilities/UtilHelpers.java     | 13 ++++++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  8 ++------
 .../hudi/utilities/sources/HoodieIncrSource.java   | 23 ++++++++++++++++++++--
 5 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e6525a2b1dc..f56defe7eac 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1034,7 +1034,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public HoodieRecordMerger getRecordMerger() {
-    List<String> mergers = getSplitStringsOrDefault(RECORD_MERGER_IMPLS).stream()
+    List<String> mergers = StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream()
         .map(String::trim)
         .distinct()
         .collect(Collectors.toList());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index a48e4202bf9..223b93e5744 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -142,14 +142,6 @@ public class HoodieConfig implements Serializable {
     return StringUtils.split(getString(configProperty), delimiter);
   }
 
-  public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty) {
-    return getSplitStringsOrDefault(configProperty, ",");
-  }
-
-  public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty, String delimiter) {
-    return StringUtils.split(getStringOrDefault(configProperty), delimiter);
-  }
-
   public String getString(String key) {
     return props.getProperty(key);
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index d159fee0be4..45a9750c3b3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -29,12 +29,16 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -109,6 +113,15 @@ public class UtilHelpers {
 
   private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
 
+  public static HoodieRecordMerger createRecordMerger(Properties props) {
+    List<String> recordMergerImplClasses = ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(),
+        HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue()));
+    HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK, recordMergerImplClasses,
+        props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()));
+
+    return recordMerger;
+  }
+
   public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
       SparkSession sparkSession, SchemaProvider schemaProvider,
       HoodieDeltaStreamerMetrics metrics) throws IOException {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 63cf706c174..77079fe0f4f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -60,8 +59,6 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.IdentityIterator;
 import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
@@ -139,6 +136,7 @@ import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
 import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -473,9 +471,7 @@ public class DeltaSync implements Serializable, Closeable {
   }
 
   private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSource(Option<String> resumeCheckpointStr) {
-    HoodieRecordType recordType = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK,
-        ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())),
-        props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).getRecordType();
+    HoodieRecordType recordType = createRecordMerger(props).getRecordType();
     if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
         && HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro"))
         != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 7134e8ff7cc..c484038b04b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -37,6 +37,8 @@ import org.apache.spark.sql.SparkSession;
 
 import java.util.Collections;
 
+import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
+
 public class HoodieIncrSource extends RowSource {
 
   private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class);
@@ -89,6 +91,12 @@ public class HoodieIncrSource extends RowSource {
      */
     static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format";
     static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";
+
+    /**
+     * Drops all meta fields from the source hudi table while ingesting into sink hudi table.
+     */
+    static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = "hoodie.deltastreamer.source.hoodieincr.drop.all.meta.fields.from.source";
+    public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = false;
   }
 
   public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
@@ -153,8 +161,19 @@ public class HoodieIncrSource extends RowSource {
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
-    // Remove Hoodie meta columns
-    final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));
+    HoodieRecord.HoodieRecordType recordType = createRecordMerger(props).getRecordType();
+
+    boolean shouldDropMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
+        Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE)
+        // NOTE: In case when Spark native [[RecordMerger]] is used, we have to make sure
+        //       all meta-fields have been properly cleaned up from the incoming dataset
+        //
+        || recordType == HoodieRecord.HoodieRecordType.SPARK;
+
+    // Remove Hoodie meta columns except partition path from input source
+    String[] colsToDrop = shouldDropMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
+        HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
+    final Dataset<Row> src = source.drop(colsToDrop);
     return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight());
   }
 }