You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/15 02:00:37 UTC
[flink-table-store] branch master updated: [FLINK-30013] Add check update column type
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 13d2ce91 [FLINK-30013] Add check update column type
13d2ce91 is described below
commit 13d2ce91ff2bd81fd474363d16c9cf8f2cbb5fb1
Author: shammon <zj...@gmail.com>
AuthorDate: Tue Nov 15 10:00:32 2022 +0800
[FLINK-30013] Add check update column type
This closes #378
---
.../table/store/file/schema/SchemaManager.java | 10 +++++++++
.../table/store/table/SchemaEvolutionTest.java | 26 ++++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 7cf5ba75..2940f465 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.utils.AtomicFileWriter;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
@@ -54,6 +55,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
+import static org.apache.flink.util.Preconditions.checkState;
/** Schema Manager to manage schema versions. */
public class SchemaManager implements Serializable {
@@ -197,6 +199,14 @@ public class SchemaManager implements Serializable {
newFields,
update.fieldName(),
(field) -> {
+ checkState(
+ LogicalTypeCasts.supportsImplicitCast(
+ field.type().logicalType, update.newLogicalType()),
+ String.format(
+ "Column type %s[%s] cannot be converted to %s without loosing information.",
+ field.name(),
+ field.type().logicalType,
+ update.newLogicalType()));
AtomicInteger dummyId = new AtomicInteger(0);
DataType newType =
TableSchema.toDataType(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 95b544df..590aff22 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -147,6 +147,32 @@ public class SchemaEvolutionTest {
.hasMessage("The column [%s] exists in the table[%s].", columnName, tablePath);
}
+ @Test
+ public void testUpdateFieldType() throws Exception {
+ UpdateSchema updateSchema =
+ new UpdateSchema(
+ RowType.of(new IntType(), new BigIntType()),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ schemaManager.commitNewVersion(updateSchema);
+
+ schemaManager.commitChanges(
+ Collections.singletonList(SchemaChange.updateColumnType("f0", new BigIntType())));
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.updateColumnType(
+ "f0", new IntType()))))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ String.format(
+ "Column type %s[%s] cannot be converted to %s without loosing information.",
+ "f0", new BigIntType(), new IntType()));
+ }
+
private List<Row> readRecords(FileStoreTable table, Predicate filter) throws IOException {
RowRowConverter converter =
RowRowConverter.create(