You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/08 07:23:40 UTC

[flink-table-store] branch master updated: [FLINK-30319] validate column name in ddl

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 51d57803 [FLINK-30319] validate column name in ddl
51d57803 is described below

commit 51d57803c3eee1943bc5a4ddf1228aff42aa88d3
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Dec 8 15:23:35 2022 +0800

    [FLINK-30319] validate column name in ddl
    
    This closes #424
---
 .../org/apache/flink/table/store/CoreOptions.java  | 19 +++++++
 .../apache/flink/table/store/file/KeyValue.java    |  4 +-
 .../flink/table/store/file/schema/TableSchema.java |  9 ++++
 .../table/ChangelogValueCountFileStoreTable.java   |  4 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |  3 +-
 .../table/store/table/SchemaEvolutionTest.java     | 58 ++++++++++++++++++++++
 6 files changed, 92 insertions(+), 5 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index ed5b1474..7f43b0bb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -50,6 +50,9 @@ import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.file.schema.TableSchema.KEY_FIELD_PREFIX;
+import static org.apache.flink.table.store.file.schema.TableSchema.SYSTEM_FIELD_NAMES;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** Core options for table store. */
 public class CoreOptions implements Serializable {
@@ -788,6 +791,22 @@ public class CoreOptions implements Serializable {
             throw new UnsupportedOperationException(
                     "Changelog table with full compaction must have primary keys");
         }
+
+        // Check column names in schema
+        schema.fieldNames()
+                .forEach(
+                        f -> {
+                            checkState(
+                                    !SYSTEM_FIELD_NAMES.contains(f),
+                                    String.format(
+                                            "Field name[%s] in schema cannot be exist in [%s]",
+                                            f, SYSTEM_FIELD_NAMES.toString()));
+                            checkState(
+                                    !f.startsWith(KEY_FIELD_PREFIX),
+                                    String.format(
+                                            "Field name[%s] in schema cannot start with [%s]",
+                                            f, KEY_FIELD_PREFIX));
+                        });
     }
 
     @Internal
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 355b3245..a1dc6c02 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -34,6 +34,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.table.store.file.schema.TableSchema.SEQUENCE_NUMBER;
+import static org.apache.flink.table.store.file.schema.TableSchema.VALUE_KIND;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -41,8 +43,6 @@ import static org.apache.flink.util.Preconditions.checkState;
  * reused.
  */
 public class KeyValue {
-    private static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
-    private static final String VALUE_KIND = "_VALUE_KIND";
 
     public static final long UNKNOWN_SEQUENCE = -1;
     public static final int UNKNOWN_LEVEL = -1;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 81f88271..d0a10c8d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -53,6 +53,15 @@ public class TableSchema implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    /** System field names. */
+    public static final String KEY_FIELD_PREFIX = "_KEY_";
+
+    public static final String VALUE_COUNT = "_VALUE_COUNT";
+    public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
+    public static final String VALUE_KIND = "_VALUE_KIND";
+    public static final List<String> SYSTEM_FIELD_NAMES =
+            Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
+
     private final long id;
 
     private final List<DataField> fields;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 00d8e86d..b6ba6111 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -51,6 +51,8 @@ import org.apache.flink.types.RowKind;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.table.store.file.schema.TableSchema.VALUE_COUNT;
+
 /** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without primary keys. */
 public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
 
@@ -173,7 +175,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
         @Override
         public List<DataField> valueFields(TableSchema schema) {
             return Collections.singletonList(
-                    new DataField(0, "_VALUE_COUNT", new AtomicDataType(new BigIntType(false))));
+                    new DataField(0, VALUE_COUNT, new AtomicDataType(new BigIntType(false))));
         }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index e7441f74..803535c4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -55,12 +55,11 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+import static org.apache.flink.table.store.file.schema.TableSchema.KEY_FIELD_PREFIX;
 
 /** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with primary keys. */
 public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
-    private static final String KEY_FIELD_PREFIX = "_KEY_";
-
     private static final long serialVersionUID = 1L;
 
     private transient KeyValueFileStore lazyStore;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 4525b218..373c1a88 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
@@ -53,6 +54,8 @@ import java.util.List;
 import java.util.UUID;
 import java.util.function.Consumer;
 
+import static org.apache.flink.table.store.file.schema.TableSchema.KEY_FIELD_PREFIX;
+import static org.apache.flink.table.store.file.schema.TableSchema.SYSTEM_FIELD_NAMES;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -271,6 +274,61 @@ public class SchemaEvolutionTest {
                 .hasMessage("Cannot drop all fields in table");
     }
 
+    @Test
+    public void testCreateAlterSystemField() throws Exception {
+        UpdateSchema updateSchema1 =
+                new UpdateSchema(
+                        RowType.of(
+                                new LogicalType[] {new IntType(), new BigIntType()},
+                                new String[] {"f0", "_VALUE_COUNT"}),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        assertThatThrownBy(() -> schemaManager.commitNewVersion(updateSchema1))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        String.format(
+                                "Field name[%s] in schema cannot be exist in [%s]",
+                                "_VALUE_COUNT", SYSTEM_FIELD_NAMES.toString()));
+
+        UpdateSchema updateSchema2 =
+                new UpdateSchema(
+                        RowType.of(
+                                new LogicalType[] {new IntType(), new BigIntType()},
+                                new String[] {"f0", "_KEY_f1"}),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        assertThatThrownBy(() -> schemaManager.commitNewVersion(updateSchema2))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        String.format(
+                                "Field name[%s] in schema cannot start with [%s]",
+                                "_KEY_f1", KEY_FIELD_PREFIX));
+
+        UpdateSchema updateSchema =
+                new UpdateSchema(
+                        RowType.of(new IntType(), new BigIntType()),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        schemaManager.commitNewVersion(updateSchema);
+
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.renameColumn("f0", "_VALUE_KIND"))))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        String.format(
+                                "Field name[%s] in schema cannot be exist in [%s]",
+                                "_VALUE_KIND", SYSTEM_FIELD_NAMES.toString()));
+    }
+
     private List<Row> readRecords(FileStoreTable table, Predicate filter) throws IOException {
         RowRowConverter converter =
                 RowRowConverter.create(