You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/04 04:00:36 UTC
[hudi] branch master updated: [HUDI-5736] Common de-coupling column drop flag and schema validation flag (#7895)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 18d528f33d8 [HUDI-5736] Common de-coupling column drop flag and schema validation flag (#7895)
18d528f33d8 is described below
commit 18d528f33d8b1dd7a836e5543ddf36e0a9c95ad1
Author: Alexander Trushev <tr...@gmail.com>
AuthorDate: Sat Mar 4 11:00:25 2023 +0700
[HUDI-5736] Common de-coupling column drop flag and schema validation flag (#7895)
* [HUDI-5736] Common de-coupling column drop flag and schema validation flag
---
.../java/org/apache/hudi/table/HoodieTable.java | 40 ++++++-------
.../hudi/table/TestHoodieMergeOnReadTable.java | 1 +
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 65 ++++++++++++++++++++++
.../org/apache/hudi/avro/TestAvroSchemaUtils.java | 57 +++++++++++++++++++
.../apache/hudi/sink/ITTestDataStreamWrite.java | 52 +++++++++++++++++
.../resources/test_read_schema_dropped_age.avsc | 41 ++++++++++++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 4 +-
.../AlterHoodieTableChangeColumnCommand.scala | 2 +-
.../hudi/command/MergeIntoHoodieTableCommand.scala | 3 +-
9 files changed, 242 insertions(+), 23 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 2a71cf4ea46..8b1056bca6c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -92,6 +93,9 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -100,7 +104,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.avro.AvroSchemaUtils.isSchemaCompatible;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -803,27 +806,22 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
*/
private void validateSchema() throws HoodieUpsertException, HoodieInsertException {
- if (!shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+ boolean shouldValidate = config.shouldValidateAvroSchema();
+ boolean allowProjection = config.shouldAllowAutoEvolutionColumnDrop();
+ if ((!shouldValidate && allowProjection)
+ || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
// Check not required
return;
}
- Schema tableSchema;
- Schema writerSchema;
- boolean isValid;
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
- writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
- tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false));
- isValid = isSchemaCompatible(tableSchema, writerSchema, config.shouldAllowAutoEvolutionColumnDrop());
+ Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
+ Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false));
+ AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
}
-
- if (!isValid) {
- throw new HoodieException("Failed schema compatibility check for writerSchema :" + writerSchema
- + ", table schema :" + tableSchema + ", base path :" + metaClient.getBasePath());
- }
}
public void validateUpsertSchema() throws HoodieUpsertException {
@@ -1041,11 +1039,15 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
return Functions.noop();
}
- private boolean shouldValidateAvroSchema() {
- // TODO(HUDI-4772) re-enable validations in case partition columns
- // being dropped from the data-file after fixing the write schema
- Boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns();
-
- return config.shouldValidateAvroSchema() && !shouldDropPartitionColumns;
+ private Set<String> getDropPartitionColNames() {
+ boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns();
+ if (!shouldDropPartitionColumns) {
+ return Collections.emptySet();
+ }
+ Option<String[]> partitionFields = metaClient.getTableConfig().getPartitionFields();
+ if (!partitionFields.isPresent()) {
+ return Collections.emptySet();
+ }
+ return new HashSet<>(Arrays.asList(partitionFields.get()));
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 37cd23451ee..26f2705a4ab 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -409,6 +409,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
.withAvroSchemaValidate(false)
+ .withAllowAutoEvolutionColumnDrop(true)
.withAutoCommit(false)
.build();
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 0a1bb747e1c..24adb1d161e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -18,12 +18,16 @@
package org.apache.hudi.avro;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -36,6 +40,13 @@ public class AvroSchemaUtils {
private AvroSchemaUtils() {}
+ /**
+ * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for more details
+ */
+ public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema) {
+ return isSchemaCompatible(prevSchema, newSchema, true);
+ }
+
/**
* See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for more details
*/
@@ -76,7 +87,18 @@ public class AvroSchemaUtils {
* @return true if prev schema is a projection of new schema.
*/
public static boolean canProject(Schema prevSchema, Schema newSchema) {
+ return canProject(prevSchema, newSchema, Collections.emptySet());
+ }
+
+ /**
+ * Check that each field in the prevSchema can be populated in the newSchema except specified columns
+ * @param prevSchema prev schema.
+ * @param newSchema new schema
+ * @return true if prev schema is a projection of new schema.
+ */
+ public static boolean canProject(Schema prevSchema, Schema newSchema, Set<String> exceptCols) {
return prevSchema.getFields().stream()
+ .filter(f -> !exceptCols.contains(f.name()))
.map(oldSchemaField -> SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
.noneMatch(Objects::isNull);
}
@@ -275,4 +297,47 @@ public class AvroSchemaUtils {
return false;
}
}
+
+ /**
+ * Checks whether writer schema is compatible with table schema considering {@code AVRO_SCHEMA_VALIDATE_ENABLE}
+ * and {@code SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP} options.
+ * To avoid collision of {@code SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP} and {@code DROP_PARTITION_COLUMNS}
+ * partition column names should be passed as {@code dropPartitionColNames}.
+ * Passed empty set means {@code DROP_PARTITION_COLUMNS} is disabled.
+ *
+ * @param tableSchema the latest dataset schema
+ * @param writerSchema writer schema
+ * @param shouldValidate whether {@link AvroSchemaCompatibility} check being performed
+ * @param allowProjection whether column dropping check being performed
+ * @param dropPartitionColNames partition column names to being excluded from column dropping check
+ * @throws SchemaCompatibilityException if writer schema is not compatible
+ */
+ public static void checkSchemaCompatible(
+ Schema tableSchema,
+ Schema writerSchema,
+ boolean shouldValidate,
+ boolean allowProjection,
+ Set<String> dropPartitionColNames) throws SchemaCompatibilityException {
+
+ String errorMessage = null;
+
+ if (!allowProjection && !canProject(tableSchema, writerSchema, dropPartitionColNames)) {
+ errorMessage = "Column dropping is not allowed";
+ }
+
+ // TODO(HUDI-4772) re-enable validations in case partition columns
+ // being dropped from the data-file after fixing the write schema
+ if (dropPartitionColNames.isEmpty() && shouldValidate && !isSchemaCompatible(tableSchema, writerSchema)) {
+ errorMessage = "Failed schema compatibility check";
+ }
+
+ if (errorMessage != null) {
+ String errorDetails = String.format(
+ "%s\nwriterSchema: %s\ntableSchema: %s",
+ errorMessage,
+ writerSchema,
+ tableSchema);
+ throw new SchemaCompatibilityException(errorDetails);
+ }
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
index 14d65e8359a..c05683e605c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
@@ -18,10 +18,16 @@
package org.apache.hudi.avro;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestAvroSchemaUtils {
@@ -172,4 +178,55 @@ public class TestAvroSchemaUtils {
Schema.createUnion(Schema.create(Schema.Type.NULL), sourceSchema),
Schema.createUnion(Schema.create(Schema.Type.NULL), projectedNestedSchema)));
}
+
+ private static final Schema FULL_SCHEMA = new Schema.Parser().parse("{\n"
+ + " \"type\" : \"record\",\n"
+ + " \"name\" : \"record\",\n"
+ + " \"fields\" : [ {\n"
+ + " \"name\" : \"a\",\n"
+ + " \"type\" : [ \"null\", \"int\" ],\n"
+ + " \"default\" : null\n"
+ + " }, {\n"
+ + " \"name\" : \"b\",\n"
+ + " \"type\" : [ \"null\", \"int\" ],\n"
+ + " \"default\" : null\n"
+ + " }, {\n"
+ + " \"name\" : \"c\",\n"
+ + " \"type\" : [ \"null\", \"int\" ],\n"
+ + " \"default\" : null\n"
+ + " } ]\n"
+ + "}");
+
+ private static final Schema SHORT_SCHEMA = new Schema.Parser().parse("{\n"
+ + " \"type\" : \"record\",\n"
+ + " \"name\" : \"record\",\n"
+ + " \"fields\" : [ {\n"
+ + " \"name\" : \"a\",\n"
+ + " \"type\" : [ \"null\", \"int\" ],\n"
+ + " \"default\" : null\n"
+ + " }, {\n"
+ + " \"name\" : \"b\",\n"
+ + " \"type\" : [ \"null\", \"int\" ],\n"
+ + " \"default\" : null\n"
+ + " } ]\n"
+ + "}\n");
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testIsCompatibleProjectionNotAllowed(boolean shouldValidate) {
+ assertThrows(SchemaCompatibilityException.class,
+ () -> AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, false, Collections.emptySet()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testIsCompatibleProjectionAllowed(boolean shouldValidate) {
+ AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, true, Collections.emptySet());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {
+ AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, false, Collections.singleton("c"));
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index e9574e61d92..2aec8e5d5fc 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
@@ -41,12 +42,14 @@ import org.apache.hudi.utils.source.ContinuousFileSource;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -68,12 +71,15 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE;
+import static org.apache.hudi.config.HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
@@ -208,6 +214,16 @@ public class ITTestDataStreamWrite extends TestLogger {
String jobName,
int checkpoints,
Map<String, List<String>> expected) throws Exception {
+ testWriteToHoodie(conf, transformer, jobName, checkpoints, true, expected);
+ }
+
+ private void testWriteToHoodie(
+ Configuration conf,
+ Option<Transformer> transformer,
+ String jobName,
+ int checkpoints,
+ boolean restartJob,
+ Map<String, List<String>> expected) throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
@@ -215,6 +231,9 @@ public class ITTestDataStreamWrite extends TestLogger {
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+ if (!restartJob) {
+ execEnv.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
+ }
// Read from file source
RowType rowType =
@@ -513,4 +532,37 @@ public class ITTestDataStreamWrite extends TestLogger {
execute(execEnv, false, "Api_Sink_Test");
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}
+
+ @Test
+ public void testColumnDroppingIsNotAllowed() throws Exception {
+ // Write cols: uuid, name, age, ts, partition
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ testWriteToHoodie(conf, "initial write", 1, EXPECTED);
+
+ // Write cols: uuid, name, ts, partition
+ conf.setBoolean(AVRO_SCHEMA_VALIDATE_ENABLE.key(), false);
+ conf.setBoolean(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), false);
+ conf.setString(
+ FlinkOptions.SOURCE_AVRO_SCHEMA_PATH,
+ Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader()
+ .getResource("test_read_schema_dropped_age.avsc")
+ ).toString()
+ );
+
+ // assert job failure with schema compatibility exception
+ try {
+ testWriteToHoodie(conf, Option.empty(), "failing job", 1, false, Collections.emptyMap());
+ } catch (JobExecutionException e) {
+ Throwable actualException = e;
+ while (actualException != null) {
+ if (actualException.getClass() == SchemaCompatibilityException.class) {
+ // test is passed
+ return;
+ }
+ actualException = actualException.getCause();
+ }
+ }
+ throw new AssertionError(String.format("Excepted exception %s is not found", SchemaCompatibilityException.class));
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_dropped_age.avsc b/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_dropped_age.avsc
new file mode 100644
index 00000000000..9bb99016cae
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_dropped_age.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type" : "record",
+ "name" : "record",
+ "fields" : [ {
+ "name" : "uuid",
+ "type" : [ "null", "string" ],
+ "default" : null
+ }, {
+ "name" : "name",
+ "type" : [ "null", "string" ],
+ "default" : null
+ }, {
+ "name" : "ts",
+ "type" : [ "null", {
+ "type" : "long",
+ "logicalType" : "timestamp-millis"
+ } ],
+ "default" : null
+ }, {
+ "name" : "partition",
+ "type" : [ "null", "string" ],
+ "default" : null
+ } ]
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ceb33875f5b..298639a4f37 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -594,11 +594,11 @@ object HoodieSparkSqlWriter {
if (isCompatibleProjectionOf(tableSchema, newSchema)) {
// Picking table schema as a writer schema we need to validate that we'd be able to
// rewrite incoming batch's data (written in new schema) into it
- (tableSchema, isSchemaCompatible(newSchema, tableSchema, true))
+ (tableSchema, isSchemaCompatible(newSchema, tableSchema))
} else {
// Picking new schema as a writer schema we need to validate that we'd be able to
// rewrite table's data into it
- (newSchema, isSchemaCompatible(tableSchema, newSchema, true))
+ (newSchema, isSchemaCompatible(tableSchema, newSchema))
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
index c4763bbe266..a6cbf1de484 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -97,7 +97,7 @@ case class AlterHoodieTableChangeColumnCommand(
private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = {
val schemaUtil = new TableSchemaResolver(metaClient)
val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema(false))
- if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema, true)) {
+ if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema)) {
throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema +
", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 9c39d82c39c..c5a6281cf2d 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE, SQL_MERGE_IN
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME}
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -593,6 +593,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
RECONCILE_SCHEMA.key -> "false",
CANONICALIZE_NULLABLE.key -> "false",
+ SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
SQL_MERGE_INTO_WRITES.key -> "true"
)