You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/22 06:50:53 UTC

[flink-table-store] branch master updated: [FLINK-29385] Verify duplicate column names for AddColumn

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 98774161 [FLINK-29385] Verify duplicate column names for AddColumn
98774161 is described below

commit 98774161860055aef2113a5442ad63dcfe3ea9eb
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Sep 22 14:50:48 2022 +0800

    [FLINK-29385] Verify duplicate column names for AddColumn
    
    This closes #302
---
 .../table/store/file/schema/SchemaManager.java     | 12 ++++++++++
 .../table/store/table/SchemaEvolutionTest.java     | 26 ++++++++++++++++++++++
 2 files changed, 38 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 0b387441..7cf5ba75 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.lang3.StringUtils;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -173,6 +175,16 @@ public class SchemaManager implements Serializable {
                     newOptions.remove(removeOption.key());
                 } else if (change instanceof AddColumn) {
                     AddColumn addColumn = (AddColumn) change;
+                    if (newFields.stream()
+                            .anyMatch(
+                                    f ->
+                                            StringUtils.equalsIgnoreCase(
+                                                    f.name(), addColumn.fieldName()))) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] exists in the table[%s].",
+                                        addColumn.fieldName(), tableRoot));
+                    }
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             TableSchema.toDataType(addColumn.logicalType(), highestFieldId);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 4bd070f5..d52df819 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -51,6 +52,8 @@ import java.util.List;
 import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.filter;
 
 /** Test for schema evolution. */
 public class SchemaEvolutionTest {
@@ -120,6 +123,29 @@ public class SchemaEvolutionTest {
         assertThat(rows).containsExactlyInAnyOrder(Row.of(3, 3L, 3L), Row.of(4, 4L, 4L));
     }
 
+    @Test
+    public void testAddDuplicateField() throws Exception {
+        final String columnName = "f3";
+        UpdateSchema updateSchema =
+                new UpdateSchema(
+                        RowType.of(new IntType(), new BigIntType()),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        schemaManager.commitNewVersion(updateSchema);
+        schemaManager.commitChanges(
+                Collections.singletonList(SchemaChange.addColumn(columnName, new BigIntType())));
+        assertThatThrownBy(
+                        () -> {
+                            schemaManager.commitChanges(
+                                    Collections.singletonList(
+                                            SchemaChange.addColumn(columnName, new FloatType())));
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("The column [%s] exists in the table[%s].", columnName, tablePath);
+    }
+
     private List<Row> readRecords(FileStoreTable table, Predicate filter) throws IOException {
         RowRowConverter converter =
                 RowRowConverter.create(