You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/15 02:00:37 UTC

[flink-table-store] branch master updated: [FLINK-30013] Add check update column type

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 13d2ce91 [FLINK-30013] Add check update column type
13d2ce91 is described below

commit 13d2ce91ff2bd81fd474363d16c9cf8f2cbb5fb1
Author: shammon <zj...@gmail.com>
AuthorDate: Tue Nov 15 10:00:32 2022 +0800

    [FLINK-30013] Add check update column type
    
    This closes #378
---
 .../table/store/file/schema/SchemaManager.java     | 10 +++++++++
 .../table/store/table/SchemaEvolutionTest.java     | 26 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 7cf5ba75..2940f465 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.utils.AtomicFileWriter;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
@@ -54,6 +55,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** Schema Manager to manage schema versions. */
 public class SchemaManager implements Serializable {
@@ -197,6 +199,14 @@ public class SchemaManager implements Serializable {
                             newFields,
                             update.fieldName(),
                             (field) -> {
+                                checkState(
+                                        LogicalTypeCasts.supportsImplicitCast(
+                                                field.type().logicalType, update.newLogicalType()),
+                                        String.format(
+                                                "Column type %s[%s] cannot be converted to %s without loosing information.",
+                                                field.name(),
+                                                field.type().logicalType,
+                                                update.newLogicalType()));
                                 AtomicInteger dummyId = new AtomicInteger(0);
                                 DataType newType =
                                         TableSchema.toDataType(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 95b544df..590aff22 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -147,6 +147,32 @@ public class SchemaEvolutionTest {
                 .hasMessage("The column [%s] exists in the table[%s].", columnName, tablePath);
     }
 
+    @Test
+    public void testUpdateFieldType() throws Exception {
+        UpdateSchema updateSchema =
+                new UpdateSchema(
+                        RowType.of(new IntType(), new BigIntType()),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        schemaManager.commitNewVersion(updateSchema);
+
+        schemaManager.commitChanges(
+                Collections.singletonList(SchemaChange.updateColumnType("f0", new BigIntType())));
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.updateColumnType(
+                                                        "f0", new IntType()))))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        String.format(
+                                "Column type %s[%s] cannot be converted to %s without loosing information.",
+                                "f0", new BigIntType(), new IntType()));
+    }
+
     private List<Row> readRecords(FileStoreTable table, Predicate filter) throws IOException {
         RowRowConverter converter =
                 RowRowConverter.create(