You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/04 04:00:36 UTC

[hudi] branch master updated: [HUDI-5736] Common de-coupling column drop flag and schema validation flag (#7895)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 18d528f33d8 [HUDI-5736] Common de-coupling column drop flag and schema validation flag (#7895)
18d528f33d8 is described below

commit 18d528f33d8b1dd7a836e5543ddf36e0a9c95ad1
Author: Alexander Trushev <tr...@gmail.com>
AuthorDate: Sat Mar 4 11:00:25 2023 +0700

    [HUDI-5736] Common de-coupling column drop flag and schema validation flag (#7895)
    
    * [HUDI-5736] Common de-coupling column drop flag and schema validation flag
---
 .../java/org/apache/hudi/table/HoodieTable.java    | 40 ++++++-------
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  1 +
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 65 ++++++++++++++++++++++
 .../org/apache/hudi/avro/TestAvroSchemaUtils.java  | 57 +++++++++++++++++++
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 52 +++++++++++++++++
 .../resources/test_read_schema_dropped_age.avsc    | 41 ++++++++++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  4 +-
 .../AlterHoodieTableChangeColumnCommand.scala      |  2 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  3 +-
 9 files changed, 242 insertions(+), 23 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 2a71cf4ea46..8b1056bca6c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -92,6 +93,9 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -100,7 +104,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.avro.AvroSchemaUtils.isSchemaCompatible;
 import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
 import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -803,27 +806,22 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
    */
   private void validateSchema() throws HoodieUpsertException, HoodieInsertException {
 
-    if (!shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+    boolean shouldValidate = config.shouldValidateAvroSchema();
+    boolean allowProjection = config.shouldAllowAutoEvolutionColumnDrop();
+    if ((!shouldValidate && allowProjection)
+        || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
       // Check not required
       return;
     }
 
-    Schema tableSchema;
-    Schema writerSchema;
-    boolean isValid;
     try {
       TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
-      writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
-      tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false));
-      isValid = isSchemaCompatible(tableSchema, writerSchema, config.shouldAllowAutoEvolutionColumnDrop());
+      Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
+      Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false));
+      AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
     }
-
-    if (!isValid) {
-      throw new HoodieException("Failed schema compatibility check for writerSchema :" + writerSchema
-          + ", table schema :" + tableSchema + ", base path :" + metaClient.getBasePath());
-    }
   }
 
   public void validateUpsertSchema() throws HoodieUpsertException {
@@ -1041,11 +1039,15 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
     return Functions.noop();
   }
 
-  private boolean shouldValidateAvroSchema() {
-    // TODO(HUDI-4772) re-enable validations in case partition columns
-    //                 being dropped from the data-file after fixing the write schema
-    Boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns();
-
-    return config.shouldValidateAvroSchema() && !shouldDropPartitionColumns;
+  private Set<String> getDropPartitionColNames() {
+    boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns();
+    if (!shouldDropPartitionColumns) {
+      return Collections.emptySet();
+    }
+    Option<String[]> partitionFields = metaClient.getTableConfig().getPartitionFields();
+    if (!partitionFields.isPresent()) {
+      return Collections.emptySet();
+    }
+    return new HashSet<>(Arrays.asList(partitionFields.get()));
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 37cd23451ee..26f2705a4ab 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -409,6 +409,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
   public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
         .withAvroSchemaValidate(false)
+        .withAllowAutoEvolutionColumnDrop(true)
         .withAutoCommit(false)
         .build();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 0a1bb747e1c..24adb1d161e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -18,12 +18,16 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -36,6 +40,13 @@ public class AvroSchemaUtils {
 
   private AvroSchemaUtils() {}
 
+  /**
+   * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for more details
+   */
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema) {
+    return isSchemaCompatible(prevSchema, newSchema, true);
+  }
+
   /**
    * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for more details
    */
@@ -76,7 +87,18 @@ public class AvroSchemaUtils {
    * @return true if prev schema is a projection of new schema.
    */
   public static boolean canProject(Schema prevSchema, Schema newSchema) {
+    return canProject(prevSchema, newSchema, Collections.emptySet());
+  }
+
+  /**
+   * Check that each field in the prevSchema can be populated in the newSchema except specified columns
+   * @param prevSchema prev schema.
+   * @param newSchema new schema
+   * @return true if prev schema is a projection of new schema.
+   */
+  public static boolean canProject(Schema prevSchema, Schema newSchema, Set<String> exceptCols) {
     return prevSchema.getFields().stream()
+        .filter(f -> !exceptCols.contains(f.name()))
         .map(oldSchemaField -> SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
         .noneMatch(Objects::isNull);
   }
@@ -275,4 +297,47 @@ public class AvroSchemaUtils {
       return false;
     }
   }
+
+  /**
+   * Checks whether writer schema is compatible with table schema considering {@code AVRO_SCHEMA_VALIDATE_ENABLE}
+   * and {@code SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP} options.
+   * To avoid collision of {@code SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP} and {@code DROP_PARTITION_COLUMNS}
+   * partition column names should be passed as {@code dropPartitionColNames}.
+   * Passed empty set means {@code DROP_PARTITION_COLUMNS} is disabled.
+   *
+   * @param tableSchema the latest dataset schema
+   * @param writerSchema writer schema
+   * @param shouldValidate whether {@link AvroSchemaCompatibility} check being performed
+   * @param allowProjection whether column dropping check being performed
+   * @param dropPartitionColNames partition column names to being excluded from column dropping check
+   * @throws SchemaCompatibilityException if writer schema is not compatible
+   */
+  public static void checkSchemaCompatible(
+      Schema tableSchema,
+      Schema writerSchema,
+      boolean shouldValidate,
+      boolean allowProjection,
+      Set<String> dropPartitionColNames) throws SchemaCompatibilityException {
+
+    String errorMessage = null;
+
+    if (!allowProjection && !canProject(tableSchema, writerSchema, dropPartitionColNames)) {
+      errorMessage = "Column dropping is not allowed";
+    }
+
+    // TODO(HUDI-4772) re-enable validations in case partition columns
+    //                 being dropped from the data-file after fixing the write schema
+    if (dropPartitionColNames.isEmpty() && shouldValidate && !isSchemaCompatible(tableSchema, writerSchema)) {
+      errorMessage = "Failed schema compatibility check";
+    }
+
+    if (errorMessage != null) {
+      String errorDetails = String.format(
+          "%s\nwriterSchema: %s\ntableSchema: %s",
+          errorMessage,
+          writerSchema,
+          tableSchema);
+      throw new SchemaCompatibilityException(errorDetails);
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
index 14d65e8359a..c05683e605c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
@@ -18,10 +18,16 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
 import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestAvroSchemaUtils {
@@ -172,4 +178,55 @@ public class TestAvroSchemaUtils {
             Schema.createUnion(Schema.create(Schema.Type.NULL), sourceSchema),
             Schema.createUnion(Schema.create(Schema.Type.NULL), projectedNestedSchema)));
   }
+
+  private static final Schema FULL_SCHEMA = new Schema.Parser().parse("{\n"
+      + "  \"type\" : \"record\",\n"
+      + "  \"name\" : \"record\",\n"
+      + "  \"fields\" : [ {\n"
+      + "    \"name\" : \"a\",\n"
+      + "    \"type\" : [ \"null\", \"int\" ],\n"
+      + "    \"default\" : null\n"
+      + "  }, {\n"
+      + "    \"name\" : \"b\",\n"
+      + "    \"type\" : [ \"null\", \"int\" ],\n"
+      + "    \"default\" : null\n"
+      + "  }, {\n"
+      + "    \"name\" : \"c\",\n"
+      + "    \"type\" : [ \"null\", \"int\" ],\n"
+      + "    \"default\" : null\n"
+      + "  } ]\n"
+      + "}");
+
+  private static final Schema SHORT_SCHEMA = new Schema.Parser().parse("{\n"
+      + "  \"type\" : \"record\",\n"
+      + "  \"name\" : \"record\",\n"
+      + "  \"fields\" : [ {\n"
+      + "    \"name\" : \"a\",\n"
+      + "    \"type\" : [ \"null\", \"int\" ],\n"
+      + "    \"default\" : null\n"
+      + "  }, {\n"
+      + "    \"name\" : \"b\",\n"
+      + "    \"type\" : [ \"null\", \"int\" ],\n"
+      + "    \"default\" : null\n"
+      + "  } ]\n"
+      + "}\n");
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testIsCompatibleProjectionNotAllowed(boolean shouldValidate) {
+    assertThrows(SchemaCompatibilityException.class,
+        () -> AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, false, Collections.emptySet()));
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testIsCompatibleProjectionAllowed(boolean shouldValidate) {
+    AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, true, Collections.emptySet());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {
+    AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, false, Collections.singleton("c"));
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index e9574e61d92..2aec8e5d5fc 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
@@ -41,12 +42,14 @@ import org.apache.hudi.utils.source.ContinuousFileSource;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -68,12 +71,15 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE;
+import static org.apache.hudi.config.HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP;
 import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
 import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
 
@@ -208,6 +214,16 @@ public class ITTestDataStreamWrite extends TestLogger {
       String jobName,
       int checkpoints,
       Map<String, List<String>> expected) throws Exception {
+    testWriteToHoodie(conf, transformer, jobName, checkpoints, true, expected);
+  }
+
+  private void testWriteToHoodie(
+      Configuration conf,
+      Option<Transformer> transformer,
+      String jobName,
+      int checkpoints,
+      boolean restartJob,
+      Map<String, List<String>> expected) throws Exception {
 
     StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     execEnv.getConfig().disableObjectReuse();
@@ -215,6 +231,9 @@ public class ITTestDataStreamWrite extends TestLogger {
     // set up checkpoint interval
     execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
     execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+    if (!restartJob) {
+      execEnv.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
+    }
 
     // Read from file source
     RowType rowType =
@@ -513,4 +532,37 @@ public class ITTestDataStreamWrite extends TestLogger {
     execute(execEnv, false, "Api_Sink_Test");
     TestData.checkWrittenDataCOW(tempFile, EXPECTED);
   }
+
+  @Test
+  public void testColumnDroppingIsNotAllowed() throws Exception {
+    // Write cols: uuid, name, age, ts, partition
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    testWriteToHoodie(conf, "initial write", 1, EXPECTED);
+
+    // Write cols: uuid, name, ts, partition
+    conf.setBoolean(AVRO_SCHEMA_VALIDATE_ENABLE.key(), false);
+    conf.setBoolean(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), false);
+    conf.setString(
+        FlinkOptions.SOURCE_AVRO_SCHEMA_PATH,
+        Objects.requireNonNull(Thread.currentThread()
+            .getContextClassLoader()
+            .getResource("test_read_schema_dropped_age.avsc")
+        ).toString()
+    );
+
+    // assert job failure with schema compatibility exception
+    try {
+      testWriteToHoodie(conf, Option.empty(), "failing job", 1, false, Collections.emptyMap());
+    } catch (JobExecutionException e) {
+      Throwable actualException = e;
+      while (actualException != null) {
+        if (actualException.getClass() == SchemaCompatibilityException.class) {
+          // test is passed
+          return;
+        }
+        actualException = actualException.getCause();
+      }
+    }
+    throw new AssertionError(String.format("Excepted exception %s is not found", SchemaCompatibilityException.class));
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_dropped_age.avsc b/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_dropped_age.avsc
new file mode 100644
index 00000000000..9bb99016cae
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_dropped_age.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "record",
+  "fields" : [ {
+    "name" : "uuid",
+    "type" : [ "null", "string" ],
+    "default" : null
+  }, {
+    "name" : "name",
+    "type" : [ "null", "string" ],
+    "default" : null
+  }, {
+    "name" : "ts",
+    "type" : [ "null", {
+      "type" : "long",
+      "logicalType" : "timestamp-millis"
+    } ],
+    "default" : null
+  }, {
+    "name" : "partition",
+    "type" : [ "null", "string" ],
+    "default" : null
+  } ]
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ceb33875f5b..298639a4f37 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -594,11 +594,11 @@ object HoodieSparkSqlWriter {
     if (isCompatibleProjectionOf(tableSchema, newSchema)) {
       // Picking table schema as a writer schema we need to validate that we'd be able to
       // rewrite incoming batch's data (written in new schema) into it
-      (tableSchema, isSchemaCompatible(newSchema, tableSchema, true))
+      (tableSchema, isSchemaCompatible(newSchema, tableSchema))
     } else {
       // Picking new schema as a writer schema we need to validate that we'd be able to
       // rewrite table's data into it
-      (newSchema, isSchemaCompatible(tableSchema, newSchema, true))
+      (newSchema, isSchemaCompatible(tableSchema, newSchema))
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
index c4763bbe266..a6cbf1de484 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -97,7 +97,7 @@ case class AlterHoodieTableChangeColumnCommand(
   private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = {
     val schemaUtil = new TableSchemaResolver(metaClient)
     val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema(false))
-    if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema, true)) {
+    if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema)) {
       throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema +
         ", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath)
     }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 9c39d82c39c..c5a6281cf2d 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE, SQL_MERGE_IN
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME}
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -593,6 +593,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
       AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
       RECONCILE_SCHEMA.key -> "false",
       CANONICALIZE_NULLABLE.key -> "false",
+      SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
       SQL_MERGE_INTO_WRITES.key -> "true"
     )