You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/18 04:00:12 UTC
[flink-table-store] branch master updated: [FLINK-30033] Add primary key type validation
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 03cfe956 [FLINK-30033] Add primary key type validation
03cfe956 is described below
commit 03cfe956b63768e6743516e842c4763ecc760c16
Author: shammon <zj...@gmail.com>
AuthorDate: Fri Nov 18 12:00:08 2022 +0800
[FLINK-30033] Add primary key type validation
This closes #383
---
.../table/store/file/schema/SchemaManager.java | 22 ++++++++++++
.../flink/table/store/file/schema/TableSchema.java | 21 +++++++++++
.../table/store/file/schema/SchemaManagerTest.java | 42 ++++++++++++++++++++++
3 files changed, 85 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 2940f465..7ab38278 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
import org.apache.flink.table.store.file.utils.AtomicFileWriter;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.util.Preconditions;
@@ -108,6 +109,7 @@ public class SchemaManager implements Serializable {
List<String> primaryKeys = updateSchema.primaryKeys();
Map<String, String> options = updateSchema.options();
+ validatePrimaryKeysType(updateSchema, primaryKeys);
while (true) {
long id;
int highestFieldId;
@@ -157,6 +159,26 @@ public class SchemaManager implements Serializable {
}
}
+ private void validatePrimaryKeysType(UpdateSchema updateSchema, List<String> primaryKeys) {
+ if (!primaryKeys.isEmpty()) {
+ Map<String, RowType.RowField> rowFields = new HashMap<>();
+ for (RowType.RowField rowField : updateSchema.rowType().getFields()) {
+ rowFields.put(rowField.getName(), rowField);
+ }
+ for (String primaryKeyName : primaryKeys) {
+ RowType.RowField rowField = rowFields.get(primaryKeyName);
+ LogicalType logicalType = rowField.getType();
+ if (TableSchema.PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES.stream()
+ .anyMatch(c -> c.isInstance(logicalType))) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The type %s in primary key field %s is unsupported",
+ logicalType.getClass().getSimpleName(), primaryKeyName));
+ }
+ }
+ }
+ }
+
/** Create {@link SchemaChange}s. */
public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
while (true) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 9ef955ff..81f88271 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -20,10 +20,17 @@ package org.apache.flink.table.store.file.schema;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.UserDefinedType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -311,4 +318,18 @@ public class TableSchema implements Serializable {
}
}
}
+
+ public static final List<Class<? extends LogicalType>> PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES =
+ Arrays.asList(
+ MapType.class,
+ ArrayType.class,
+ RowType.class,
+ UserDefinedType.class,
+ DistinctType.class,
+ StructuredType.class,
+ MultisetType.class,
+ NullType.class,
+ LegacyTypeInformationType.class,
+ SymbolType.class,
+ UnresolvedUserDefinedType.class);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 20cd4acd..b6c8b910 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.store.file.schema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -47,6 +49,7 @@ import java.util.stream.IntStream;
import static org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem.retryArtificialException;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;
/** Test for {@link SchemaManager}. */
@@ -182,4 +185,43 @@ public class SchemaManagerTest {
.mapToObj(String::valueOf)
.toArray(String[]::new));
}
+
+ @Test
+ public void testPrimaryKeyType() throws Exception {
+ final RowType mapPrimaryKeyType =
+ RowType.of(
+ new MapType(new IntType(), new BigIntType()),
+ new BigIntType(),
+ new VarCharType());
+ final UpdateSchema mapPrimaryKeySchema =
+ new UpdateSchema(mapPrimaryKeyType, partitionKeys, primaryKeys, options, "");
+ assertThatThrownBy(() -> manager.commitNewVersion(mapPrimaryKeySchema))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "The type %s in primary key field %s is unsupported",
+ MapType.class.getSimpleName(), "f0");
+
+ RowType doublePrimaryKeyType =
+ RowType.of(new DoubleType(), new BigIntType(), new VarCharType());
+ final UpdateSchema doublePrimaryKeySchema =
+ new UpdateSchema(doublePrimaryKeyType, partitionKeys, primaryKeys, options, "");
+
+ TableSchema tableSchema =
+ retryArtificialException(() -> manager.commitNewVersion(doublePrimaryKeySchema));
+
+ Optional<TableSchema> latest = retryArtificialException(() -> manager.latest());
+
+ List<DataField> fields =
+ Arrays.asList(
+ new DataField(0, "f0", new AtomicDataType(new DoubleType(false))),
+ new DataField(1, "f1", new AtomicDataType(new BigIntType(false))),
+ new DataField(2, "f2", new AtomicDataType(new VarCharType())));
+
+ assertThat(latest.isPresent()).isTrue();
+ assertThat(tableSchema).isEqualTo(latest.get());
+ assertThat(tableSchema.fields()).isEqualTo(fields);
+ assertThat(tableSchema.partitionKeys()).isEqualTo(partitionKeys);
+ assertThat(tableSchema.primaryKeys()).isEqualTo(primaryKeys);
+ assertThat(tableSchema.options()).isEqualTo(options);
+ }
}