You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/22 06:50:53 UTC
[flink-table-store] branch master updated: [FLINK-29385] Verify duplicate column names for AddColumn
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 98774161 [FLINK-29385] Verify duplicate column names for AddColumn
98774161 is described below
commit 98774161860055aef2113a5442ad63dcfe3ea9eb
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Sep 22 14:50:48 2022 +0800
[FLINK-29385] Verify duplicate column names for AddColumn
This closes #302
---
.../table/store/file/schema/SchemaManager.java | 12 ++++++++++
.../table/store/table/SchemaEvolutionTest.java | 26 ++++++++++++++++++++++
2 files changed, 38 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 0b387441..7cf5ba75 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -173,6 +175,16 @@ public class SchemaManager implements Serializable {
newOptions.remove(removeOption.key());
} else if (change instanceof AddColumn) {
AddColumn addColumn = (AddColumn) change;
+ if (newFields.stream()
+ .anyMatch(
+ f ->
+ StringUtils.equalsIgnoreCase(
+ f.name(), addColumn.fieldName()))) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The column [%s] exists in the table[%s].",
+ addColumn.fieldName(), tableRoot));
+ }
int id = highestFieldId.incrementAndGet();
DataType dataType =
TableSchema.toDataType(addColumn.logicalType(), highestFieldId);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 4bd070f5..d52df819 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
@@ -51,6 +52,8 @@ import java.util.List;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.filter;
/** Test for schema evolution. */
public class SchemaEvolutionTest {
@@ -120,6 +123,29 @@ public class SchemaEvolutionTest {
assertThat(rows).containsExactlyInAnyOrder(Row.of(3, 3L, 3L), Row.of(4, 4L, 4L));
}
+ @Test
+ public void testAddDuplicateField() throws Exception {
+ final String columnName = "f3";
+ UpdateSchema updateSchema =
+ new UpdateSchema(
+ RowType.of(new IntType(), new BigIntType()),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ schemaManager.commitNewVersion(updateSchema);
+ schemaManager.commitChanges(
+ Collections.singletonList(SchemaChange.addColumn(columnName, new BigIntType())));
+ assertThatThrownBy(
+ () -> {
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(columnName, new FloatType())));
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("The column [%s] exists in the table[%s].", columnName, tablePath);
+ }
+
private List<Row> readRecords(FileStoreTable table, Predicate filter) throws IOException {
RowRowConverter converter =
RowRowConverter.create(