You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/08 07:23:40 UTC
[flink-table-store] branch master updated: [FLINK-30319] validate column name in ddl
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 51d57803 [FLINK-30319] validate column name in ddl
51d57803 is described below
commit 51d57803c3eee1943bc5a4ddf1228aff42aa88d3
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Dec 8 15:23:35 2022 +0800
[FLINK-30319] validate column name in ddl
This closes #424
---
.../org/apache/flink/table/store/CoreOptions.java | 19 +++++++
.../apache/flink/table/store/file/KeyValue.java | 4 +-
.../flink/table/store/file/schema/TableSchema.java | 9 ++++
.../table/ChangelogValueCountFileStoreTable.java | 4 +-
.../table/ChangelogWithKeyFileStoreTable.java | 3 +-
.../table/store/table/SchemaEvolutionTest.java | 58 ++++++++++++++++++++++
6 files changed, 92 insertions(+), 5 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index ed5b1474..7f43b0bb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -50,6 +50,9 @@ import java.util.Set;
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.file.schema.TableSchema.KEY_FIELD_PREFIX;
+import static org.apache.flink.table.store.file.schema.TableSchema.SYSTEM_FIELD_NAMES;
+import static org.apache.flink.util.Preconditions.checkState;
/** Core options for table store. */
public class CoreOptions implements Serializable {
@@ -788,6 +791,22 @@ public class CoreOptions implements Serializable {
throw new UnsupportedOperationException(
"Changelog table with full compaction must have primary keys");
}
+
+ // Check column names in schema
+ schema.fieldNames()
+ .forEach(
+ f -> {
+ checkState(
+ !SYSTEM_FIELD_NAMES.contains(f),
+ String.format(
+ "Field name[%s] in schema cannot be exist in [%s]",
+ f, SYSTEM_FIELD_NAMES.toString()));
+ checkState(
+ !f.startsWith(KEY_FIELD_PREFIX),
+ String.format(
+ "Field name[%s] in schema cannot start with [%s]",
+ f, KEY_FIELD_PREFIX));
+ });
}
@Internal
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 355b3245..a1dc6c02 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -34,6 +34,8 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.table.store.file.schema.TableSchema.SEQUENCE_NUMBER;
+import static org.apache.flink.table.store.file.schema.TableSchema.VALUE_KIND;
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -41,8 +43,6 @@ import static org.apache.flink.util.Preconditions.checkState;
* reused.
*/
public class KeyValue {
- private static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
- private static final String VALUE_KIND = "_VALUE_KIND";
public static final long UNKNOWN_SEQUENCE = -1;
public static final int UNKNOWN_LEVEL = -1;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 81f88271..d0a10c8d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -53,6 +53,15 @@ public class TableSchema implements Serializable {
private static final long serialVersionUID = 1L;
+ /** System field names. */
+ public static final String KEY_FIELD_PREFIX = "_KEY_";
+
+ public static final String VALUE_COUNT = "_VALUE_COUNT";
+ public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
+ public static final String VALUE_KIND = "_VALUE_KIND";
+ public static final List<String> SYSTEM_FIELD_NAMES =
+ Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
+
private final long id;
private final List<DataField> fields;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 00d8e86d..b6ba6111 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -51,6 +51,8 @@ import org.apache.flink.types.RowKind;
import java.util.Collections;
import java.util.List;
+import static org.apache.flink.table.store.file.schema.TableSchema.VALUE_COUNT;
+
/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without primary keys. */
public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@@ -173,7 +175,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@Override
public List<DataField> valueFields(TableSchema schema) {
return Collections.singletonList(
- new DataField(0, "_VALUE_COUNT", new AtomicDataType(new BigIntType(false))));
+ new DataField(0, VALUE_COUNT, new AtomicDataType(new BigIntType(false))));
}
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index e7441f74..803535c4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -55,12 +55,11 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+import static org.apache.flink.table.store.file.schema.TableSchema.KEY_FIELD_PREFIX;
/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with primary keys. */
public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
- private static final String KEY_FIELD_PREFIX = "_KEY_";
-
private static final long serialVersionUID = 1L;
private transient KeyValueFileStore lazyStore;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 4525b218..373c1a88 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
@@ -53,6 +54,8 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
+import static org.apache.flink.table.store.file.schema.TableSchema.KEY_FIELD_PREFIX;
+import static org.apache.flink.table.store.file.schema.TableSchema.SYSTEM_FIELD_NAMES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -271,6 +274,61 @@ public class SchemaEvolutionTest {
.hasMessage("Cannot drop all fields in table");
}
+ @Test
+ public void testCreateAlterSystemField() throws Exception {
+ UpdateSchema updateSchema1 =
+ new UpdateSchema(
+ RowType.of(
+ new LogicalType[] {new IntType(), new BigIntType()},
+ new String[] {"f0", "_VALUE_COUNT"}),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ assertThatThrownBy(() -> schemaManager.commitNewVersion(updateSchema1))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ String.format(
+ "Field name[%s] in schema cannot be exist in [%s]",
+ "_VALUE_COUNT", SYSTEM_FIELD_NAMES.toString()));
+
+ UpdateSchema updateSchema2 =
+ new UpdateSchema(
+ RowType.of(
+ new LogicalType[] {new IntType(), new BigIntType()},
+ new String[] {"f0", "_KEY_f1"}),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ assertThatThrownBy(() -> schemaManager.commitNewVersion(updateSchema2))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ String.format(
+ "Field name[%s] in schema cannot start with [%s]",
+ "_KEY_f1", KEY_FIELD_PREFIX));
+
+ UpdateSchema updateSchema =
+ new UpdateSchema(
+ RowType.of(new IntType(), new BigIntType()),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ schemaManager.commitNewVersion(updateSchema);
+
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.renameColumn("f0", "_VALUE_KIND"))))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ String.format(
+ "Field name[%s] in schema cannot be exist in [%s]",
+ "_VALUE_KIND", SYSTEM_FIELD_NAMES.toString()));
+ }
+
private List<Row> readRecords(FileStoreTable table, Predicate filter) throws IOException {
RowRowConverter converter =
RowRowConverter.create(