You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/05 10:01:04 UTC
[flink-table-store] branch master updated: [FLINK-27844] Support column type evolution in file meta
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d031fd8e [FLINK-27844] Support column type evolution in file meta
d031fd8e is described below
commit d031fd8e977ac754d78d256dd276b6dae595e723
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Jan 5 18:00:58 2023 +0800
[FLINK-27844] Support column type evolution in file meta
This closes #423
---
.../table/store/file/casting/CastExecutors.java | 31 +-
.../file/operation/AppendOnlyFileStoreScan.java | 17 +-
.../file/operation/KeyValueFileStoreScan.java | 18 +-
.../flink/table/store/file/schema/DataType.java | 13 +
.../store/file/schema/SchemaEvolutionUtil.java | 56 ++++
.../table/store/file/schema/SchemaManager.java | 14 +-
.../file/stats/FieldStatsArraySerializer.java | 22 +-
.../table/store/file/casting/CastExecutorTest.java | 12 +
.../file/stats/FieldStatsArraySerializerTest.java | 11 +-
.../AppendOnlyTableColumnTypeFileMetaTest.java | 56 ++++
.../ChangelogValueCountColumnTypeFileMetaTest.java | 63 ++++
...hangelogWithKeyTableColumnTypeFileMetaTest.java | 154 ++++++++++
.../store/table/ColumnTypeFileMetaTestBase.java | 341 +++++++++++++++++++++
.../table/store/table/FileMetaFilterTestBase.java | 13 -
.../store/table/SchemaEvolutionTableTestBase.java | 251 +++++++++++++++
15 files changed, 1036 insertions(+), 36 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
index d46c68fc..28e41b1b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
@@ -45,6 +45,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
/** Cast executors for input type and output type. */
public class CastExecutors {
+ private static final CastExecutor<?, ?> IDENTITY_CAST_EXECUTOR = value -> value;
/**
* Resolve a {@link CastExecutor} for the provided input type and target type. Returns null if
@@ -176,12 +177,28 @@ public class CastExecutors {
}
return null;
case BINARY:
- if (outputType.getTypeRoot() == BINARY) {
+ if (outputType.getTypeRoot() == BINARY || outputType.getTypeRoot() == VARBINARY) {
+ boolean targetBinaryType = outputType.getTypeRoot() == BINARY;
final int targetLength = getBinaryLength(outputType);
- return value ->
- ((((byte[]) value).length == targetLength)
- ? value
- : Arrays.copyOf((byte[]) value, targetLength));
+ return value -> {
+ byte[] bytes = (byte[]) value;
+ if (((byte[]) value).length == targetLength) {
+ return value;
+ }
+ if (targetBinaryType) {
+ if (bytes.length == targetLength) {
+ return bytes;
+ } else {
+ return Arrays.copyOf(bytes, targetLength);
+ }
+ } else {
+ if (bytes.length <= targetLength) {
+ return bytes;
+ } else {
+ return Arrays.copyOf(bytes, targetLength);
+ }
+ }
+ };
}
return null;
case TIMESTAMP_WITHOUT_TIME_ZONE:
@@ -225,6 +242,10 @@ public class CastExecutors {
}
}
+ public static CastExecutor<?, ?> identityCastExecutor() {
+ return IDENTITY_CAST_EXECUTOR;
+ }
+
private static int getStringLength(LogicalType logicalType) {
if (logicalType instanceof CharType) {
return ((CharType) logicalType).getLength();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index ac65b0ab..a63363fb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -18,10 +18,12 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.table.store.file.casting.CastExecutor;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -103,13 +105,22 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
schemaId,
id -> {
TableSchema tableSchema = scanTableSchema();
+ List<DataField> tableFields = tableSchema.fields();
TableSchema schema = scanTableSchema(id);
- return new FieldStatsArraySerializer(
- schema.logicalRowType(),
+ List<DataField> dataFields = schema.fields();
+ int[] indexMapping =
tableSchema.id() == id
? null
: SchemaEvolutionUtil.createIndexMapping(
- tableSchema.fields(), schema.fields()));
+ tableFields, dataFields);
+ CastExecutor<Object, Object>[] converterMapping =
+ tableSchema.id() == id
+ ? null
+ : (CastExecutor<Object, Object>[])
+ SchemaEvolutionUtil.createConvertMapping(
+ tableFields, dataFields, indexMapping);
+ return new FieldStatsArraySerializer(
+ schema.logicalRowType(), indexMapping, converterMapping);
});
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index 25391021..ee828dda 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.table.store.file.casting.CastExecutor;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
@@ -110,14 +111,23 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
key -> {
final TableSchema tableSchema = scanTableSchema();
final TableSchema schema = scanTableSchema(key);
+ final List<DataField> tableKeyFields =
+ keyValueFieldsExtractor.keyFields(tableSchema);
final List<DataField> keyFields = keyValueFieldsExtractor.keyFields(schema);
- return new FieldStatsArraySerializer(
- RowDataType.toRowType(false, keyFields),
+ final int[] indexMapping =
tableSchema.id() == key
? null
: SchemaEvolutionUtil.createIndexMapping(
- keyValueFieldsExtractor.keyFields(tableSchema),
- keyFields));
+ tableKeyFields, keyFields);
+ final CastExecutor<?, ?>[] converterMapping =
+ tableSchema.id() == key
+ ? null
+ : SchemaEvolutionUtil.createConvertMapping(
+ tableKeyFields, keyFields, indexMapping);
+ return new FieldStatsArraySerializer(
+ RowDataType.toRowType(false, keyFields),
+ indexMapping,
+ (CastExecutor<Object, Object>[]) converterMapping);
});
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
index 83bfff0b..e3a6f6e4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.file.schema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
+
import java.io.Serializable;
import java.util.Objects;
@@ -53,6 +55,17 @@ public abstract class DataType implements Serializable {
*/
public abstract DataType copy(boolean isNullable);
+ /**
+ * Compare two data types without nullable.
+ *
+ * @param o the target data type
+ */
+ public boolean equalsIgnoreNullable(@Nonnull DataType o) {
+ LogicalType sourceType = logicalType.copy(true);
+ LogicalType targetType = o.logicalType.copy(true);
+ return Objects.equals(sourceType, targetType);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
index 820ce6d4..ccb6e9b4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.schema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.casting.CastExecutor;
+import org.apache.flink.table.store.file.casting.CastExecutors;
import org.apache.flink.table.store.file.predicate.LeafPredicate;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateReplaceVisitor;
@@ -37,6 +39,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/** Utils for schema evolution. */
public class SchemaEvolutionUtil {
@@ -304,4 +307,57 @@ public class SchemaEvolutionUtil {
throw new IllegalArgumentException(
String.format("Can't find data field %s", dataField.name()));
}
+
+ /**
+ * Create converter mapping from table fields to underlying data fields. For example, the table
+ * and data fields are as follows
+ *
+ * <ul>
+ * <li>table fields: 1->c INT, 6->b STRING, 3->a BIGINT
+ * <li>data fields: 1->a BIGINT, 3->c DOUBLE
+ * </ul>
+ *
+ * <p>We can get the column types (1->a BIGINT), (3->c DOUBLE) from data fields for (1->c INT)
+ * and (3->a BIGINT) in table fields through index mapping [0, -1, 1], then compare the data
+ * type and create converter mapping.
+ *
+ * <p>/// TODO should support nest index mapping when nest schema evolution is supported.
+ *
+ * @param tableFields the fields of table
+ * @param dataFields the fields of underlying data
+ * @param indexMapping the index mapping from table fields to data fields
+ * @return the index mapping
+ */
+ @Nullable
+ public static CastExecutor<?, ?>[] createConvertMapping(
+ List<DataField> tableFields, List<DataField> dataFields, int[] indexMapping) {
+ CastExecutor<?, ?>[] converterMapping = new CastExecutor<?, ?>[tableFields.size()];
+ boolean castExist = false;
+ for (int i = 0; i < tableFields.size(); i++) {
+ int dataIndex = indexMapping == null ? i : indexMapping[i];
+ if (dataIndex < 0) {
+ converterMapping[i] = CastExecutors.identityCastExecutor();
+ } else {
+ DataField tableField = tableFields.get(i);
+ DataField dataField = dataFields.get(dataIndex);
+ if (dataField.type().equalsIgnoreNullable(tableField.type())) {
+ converterMapping[i] = CastExecutors.identityCastExecutor();
+ } else {
+ // TODO support column type evolution in nested type
+ checkState(
+ tableField.type() instanceof AtomicDataType
+ && dataField.type() instanceof AtomicDataType,
+ "Only support column type evolution in atomic data type.");
+ converterMapping[i] =
+ checkNotNull(
+ CastExecutors.resolve(
+ dataField.type().logicalType(),
+ tableField.type().logicalType()));
+ castExist = true;
+ }
+ }
+ }
+
+ return castExist ? converterMapping : null;
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 9946e703..750c21fb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.casting.CastExecutors;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn;
import org.apache.flink.table.store.file.schema.SchemaChange.DropColumn;
@@ -255,13 +256,24 @@ public class SchemaManager implements Serializable {
}
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
+ if (schema.partitionKeys().contains(update.fieldName())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot update partition column [%s] type in the table[%s].",
+ update.fieldName(), tableRoot));
+ }
updateColumn(
newFields,
update.fieldName(),
(field) -> {
checkState(
LogicalTypeCasts.supportsImplicitCast(
- field.type().logicalType, update.newLogicalType()),
+ field.type().logicalType,
+ update.newLogicalType())
+ && CastExecutors.resolve(
+ field.type().logicalType,
+ update.newLogicalType())
+ != null,
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
field.name(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 3ce0bcf5..7e02e5bf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.stats;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.casting.CastExecutor;
import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.ArrayType;
@@ -44,12 +45,14 @@ public class FieldStatsArraySerializer {
private final RowData.FieldGetter[] fieldGetters;
@Nullable private final int[] indexMapping;
+ @Nullable private final CastExecutor<Object, Object>[] converterMapping;
public FieldStatsArraySerializer(RowType type) {
- this(type, null);
+ this(type, null, null);
}
- public FieldStatsArraySerializer(RowType type, int[] indexMapping) {
+ public FieldStatsArraySerializer(
+ RowType type, int[] indexMapping, CastExecutor<Object, Object>[] converterMapping) {
RowType safeType = toAllFieldsNullableRowType(type);
this.serializer = new RowDataSerializer(safeType);
this.fieldGetters =
@@ -60,6 +63,7 @@ public class FieldStatsArraySerializer {
safeType.getTypeAt(i), i))
.toArray(RowData.FieldGetter[]::new);
this.indexMapping = indexMapping;
+ this.converterMapping = converterMapping;
}
public BinaryTableStats toBinary(FieldStats[] stats) {
@@ -95,11 +99,15 @@ public class FieldStatsArraySerializer {
}
stats[i] = new FieldStats(null, null, rowCount);
} else {
- stats[i] =
- new FieldStats(
- fieldGetters[fieldIndex].getFieldOrNull(array.min()),
- fieldGetters[fieldIndex].getFieldOrNull(array.max()),
- array.nullCounts()[fieldIndex]);
+ CastExecutor<Object, Object> converter =
+ converterMapping == null ? null : converterMapping[i];
+ Object min = fieldGetters[fieldIndex].getFieldOrNull(array.min());
+ min = converter == null || min == null ? min : converter.cast(min);
+
+ Object max = fieldGetters[fieldIndex].getFieldOrNull(array.max());
+ max = converter == null || max == null ? max : converter.cast(max);
+
+ stats[i] = new FieldStats(min, max, array.nullCounts()[fieldIndex]);
}
}
return stats;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
index ea5ffeeb..70d06086 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
@@ -223,6 +223,18 @@ public class CastExecutorTest {
CastExecutors.resolve(new BinaryType(10), new BinaryType(20)),
"12345678".getBytes(),
new byte[] {49, 50, 51, 52, 53, 54, 55, 56, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
+
+ // binary(10) to varbinary(5)
+ compareCastResult(
+ CastExecutors.resolve(new BinaryType(10), new VarBinaryType(5)),
+ "1234567890".getBytes(),
+ "12345".getBytes());
+
+ // binary(10) to varbinary(20)
+ compareCastResult(
+ CastExecutors.resolve(new BinaryType(10), new VarBinaryType(20)),
+ "12345678".getBytes(),
+ "12345678".getBytes());
}
@Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
index 2e63eea5..3990bc6d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.stats;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.casting.CastExecutor;
import org.apache.flink.table.store.file.schema.AtomicDataType;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
@@ -93,11 +94,15 @@ public class FieldStatsArraySerializerTest {
Collections.EMPTY_MAP,
"");
+ int[] indexMapping =
+ SchemaEvolutionUtil.createIndexMapping(tableSchema.fields(), dataSchema.fields());
+ CastExecutor<Object, Object>[] converterMapping =
+ (CastExecutor<Object, Object>[])
+ SchemaEvolutionUtil.createConvertMapping(
+ tableSchema.fields(), dataSchema.fields(), indexMapping);
FieldStatsArraySerializer fieldStatsArraySerializer =
new FieldStatsArraySerializer(
- tableSchema.logicalRowType(),
- SchemaEvolutionUtil.createIndexMapping(
- tableSchema.fields(), dataSchema.fields()));
+ tableSchema.logicalRowType(), indexMapping, converterMapping);
BinaryRowData minRowData = row(1, 2, 3, 4);
BinaryRowData maxRowData = row(100, 99, 98, 97);
long[] nullCounts = new long[] {1, 0, 10, 100};
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableColumnTypeFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableColumnTypeFileMetaTest.java
new file mode 100644
index 00000000..87cf9336
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableColumnTypeFileMetaTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Map;
+
+/** File meta tests for column type evolution in {@link AppendOnlyFileStoreTable}. */
+public class AppendOnlyTableColumnTypeFileMetaTest extends ColumnTypeFileMetaTestBase {
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> tableSchemas) {
+ SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas);
+ return new AppendOnlyFileStoreTable(tablePath, schemaManager.latest().get()) {
+ @Override
+ protected SchemaManager schemaManager() {
+ return schemaManager;
+ }
+ };
+ }
+
+ @Override
+ protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+ return fileMeta.valueStats();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountColumnTypeFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountColumnTypeFileMetaTest.java
new file mode 100644
index 00000000..1eec700e
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountColumnTypeFileMetaTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** File meta tests for column type evolution in {@link ChangelogValueCountFileStoreTable}. */
+public class ChangelogValueCountColumnTypeFileMetaTest extends ColumnTypeFileMetaTestBase {
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ }
+
+ @Override
+ protected List<String> getPrimaryKeyNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> tableSchemas) {
+ SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas);
+ return new ChangelogValueCountFileStoreTable(tablePath, schemaManager.latest().get()) {
+ @Override
+ protected SchemaManager schemaManager() {
+ return schemaManager;
+ }
+ };
+ }
+
+ @Override
+ protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+ return fileMeta.keyStats();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
new file mode 100644
index 00000000..d7d7f19e
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** File meta tests for column type evolution in {@link ChangelogWithKeyFileStoreTable}. */
+public class ChangelogWithKeyTableColumnTypeFileMetaTest extends ColumnTypeFileMetaTestBase {
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> tableSchemas) {
+ SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas);
+ return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager.latest().get()) {
+ @Override
+ protected SchemaManager schemaManager() {
+ return schemaManager;
+ }
+ };
+ }
+
+ @Override
+ protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+ return fileMeta.keyStats();
+ }
+
+ /** We can only validate field stats of primary keys in changelog with key table. */
+ @Override
+ protected void validateStatsField(List<DataFileMeta> fileMetaList) {
+ for (DataFileMeta fileMeta : fileMetaList) {
+ FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+ assertThat(statsArray.length).isEqualTo(4);
+ for (int i = 0; i < 4; i++) {
+ assertThat(statsArray[i].minValue()).isNotNull();
+ assertThat(statsArray[i].maxValue()).isNotNull();
+ }
+ }
+ }
+
+ @Override
+ @Test
+ public void testTableScanFilterNormalFields() throws Exception {
+ writeAndCheckFileResultForColumnType(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ /**
+ * Changelog with key table doesn't support filter in value, it will scan all
+ * data. TODO support filter value in future.
+ */
+ Predicate predicate =
+ new PredicateBuilder(table.schema().logicalRowType())
+ .between(6, 200L, 500L);
+ DataTableScan.DataFilePlan plan = table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 3L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+
+ /**
+ * Changelog with key table doesn't support filter in value, it will scan all
+ * data. TODO support filter value in future.
+ */
+ DataTableScan.DataFilePlan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(table.schema().logicalRowType())
+ .between(6, 200F, 500F))
+ .plan();
+ checkFilterRowCount(plan, 6L);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ /** We can only validate the values in primary keys for changelog with key table. */
+ @Override
+ protected void validateValuesWithNewSchema(
+ List<String> filesName, List<DataFileMeta> fileMetaList) {
+ for (DataFileMeta fileMeta : fileMetaList) {
+ FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+ assertThat(statsArray.length).isEqualTo(4);
+ if (filesName.contains(fileMeta.fileName())) {
+ assertThat(statsArray[0].minValue()).isEqualTo(StringData.fromString("200 "));
+ assertThat(statsArray[0].maxValue()).isEqualTo(StringData.fromString("300 "));
+
+ assertThat(statsArray[1].minValue()).isEqualTo(StringData.fromString("201"));
+ assertThat(statsArray[1].maxValue()).isEqualTo(StringData.fromString("301"));
+
+ assertThat((Double) statsArray[2].minValue()).isEqualTo(202D);
+ assertThat((Double) statsArray[2].maxValue()).isEqualTo(302D);
+
+ assertThat((Integer) statsArray[3].minValue()).isEqualTo(203);
+ assertThat((Integer) statsArray[3].maxValue()).isEqualTo(303);
+ } else {
+ assertThat(statsArray[0].minValue())
+ .isEqualTo(statsArray[0].maxValue())
+ .isEqualTo(StringData.fromString("400"));
+ assertThat(statsArray[1].minValue())
+ .isEqualTo(statsArray[1].maxValue())
+ .isEqualTo(StringData.fromString("401"));
+ assertThat((Double) statsArray[2].minValue())
+ .isEqualTo((Double) statsArray[2].maxValue())
+ .isEqualTo(402D);
+ assertThat((Integer) statsArray[3].minValue())
+ .isEqualTo(statsArray[3].maxValue())
+ .isEqualTo(403);
+ }
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ColumnTypeFileMetaTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ColumnTypeFileMetaTestBase.java
new file mode 100644
index 00000000..54836847
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ColumnTypeFileMetaTestBase.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for column type evolution. */
+public abstract class ColumnTypeFileMetaTestBase extends SchemaEvolutionTableTestBase {
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ tableConfig.set(CoreOptions.BUCKET, 1);
+ }
+
+ @Test
+ public void testTableScan() throws Exception {
+ writeAndCheckFileResultForColumnType(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ checkFilterRowCount(plan, 3L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ // Scan all data files
+ DataTableScan.DataFilePlan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(table.schema().logicalRowType())
+ .greaterOrEqual(1, StringData.fromString("0")))
+ .plan();
+ checkFilterRowCount(plan, 6L);
+
+ List<String> filesName =
+ files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+ assertThat(filesName.size()).isGreaterThan(0);
+
+ List<DataFileMeta> fileMetaList =
+ plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ assertThat(
+ fileMetaList.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList()))
+ .containsAll(filesName);
+
+ validateStatsField(fileMetaList);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ protected void validateStatsField(List<DataFileMeta> fileMetaList) {
+ for (DataFileMeta fileMeta : fileMetaList) {
+ FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+ assertThat(statsArray.length).isEqualTo(12);
+ for (int i = 0; i < 11; i++) {
+ assertThat(statsArray[i].minValue()).isNotNull();
+ assertThat(statsArray[i].maxValue()).isNotNull();
+ }
+ // Min and max value of binary type is null
+ assertThat(statsArray[11].minValue()).isNull();
+ assertThat(statsArray[11].maxValue()).isNull();
+ }
+ }
+
+ @Test
+ public void testTableScanFilterNormalFields() throws Exception {
+ writeAndCheckFileResultForColumnType(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ /**
+ * Filter field "g" in [200, 500] in SCHEMA_FIELDS which is bigint and will get
+ * one file with two data as followed:
+ *
+ * <ul>
+ * <li>2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209
+ * * millsPerDay),toBytes("210")
+ * <li>2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309
+ * * millsPerDay),toBytes("310")
+ * </ul>
+ */
+ Predicate predicate =
+ new PredicateBuilder(table.schema().logicalRowType())
+ .between(6, 200L, 500L);
+ DataTableScan.DataFilePlan plan = table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 2L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+
+ /**
+ * Filter field "g" in [200, 500] in SCHEMA_FIELDS which is updated from bigint
+ * to float and will get another file with one data as followed:
+ *
+ * <ul>
+ * <li>2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
+ * </ul>
+ *
+ * <p>Then we can check the results of the two result files.
+ */
+ DataTableScan.DataFilePlan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(table.schema().logicalRowType())
+ .between(6, 200F, 500F))
+ .plan();
+ checkFilterRowCount(plan, 3L);
+
+ List<String> filesName =
+ files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+ assertThat(filesName.size()).isGreaterThan(0);
+
+ List<DataFileMeta> fileMetaList =
+ plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ assertThat(
+ fileMetaList.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList()))
+ .containsAll(filesName);
+
+ validateValuesWithNewSchema(filesName, fileMetaList);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Test
+ public void testTableScanFilterPrimaryKeyFields() throws Exception {
+ writeAndCheckFileResultForColumnType(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ // results of field "e" in [200, 500] in SCHEMA_FIELDS which is bigint
+ Predicate predicate =
+ new PredicateBuilder(table.schema().logicalRowType())
+ .between(4, (short) 200, (short) 500);
+ DataTableScan.DataFilePlan plan = table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 2L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ // results of field "e" in [200, 500] in SCHEMA_FIELDS which is updated from
+ // bigint to int
+ DataTableScan.DataFilePlan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(table.schema().logicalRowType())
+ .between(4, 200, 500))
+ .plan();
+ checkFilterRowCount(plan, 3L);
+
+ List<String> filesName =
+ files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+ assertThat(filesName.size()).isGreaterThan(0);
+
+ List<DataFileMeta> fileMetaList =
+ plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ assertThat(
+ fileMetaList.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList()))
+ .containsAll(filesName);
+
+ // Compare all columns with table column type
+ validateValuesWithNewSchema(filesName, fileMetaList);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ protected void validateValuesWithNewSchema(
+ List<String> filesName, List<DataFileMeta> fileMetaList) {
+ for (DataFileMeta fileMeta : fileMetaList) {
+ FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+ assertThat(statsArray.length).isEqualTo(12);
+ if (filesName.contains(fileMeta.fileName())) {
+ checkTwoValues(statsArray);
+ } else {
+ checkOneValue(statsArray);
+ }
+ }
+ }
+
+ /**
+ * Check file data with one data.
+ *
+ * <ul>
+ * <li>data:
+ * 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
+ * <li>types: a->int, b->varchar[10], c->varchar[10], d->double, e->int, f->decimal,g->float,
+ * h->double, i->decimal, j->date, k->date, l->varbinary
+ * </ul>
+ *
+ * @param statsArray the field stats
+ */
+ private void checkOneValue(FieldStats[] statsArray) {
+ assertThat(statsArray[0].minValue()).isEqualTo((statsArray[0].maxValue())).isEqualTo(2);
+ assertThat(statsArray[1].minValue())
+ .isEqualTo(statsArray[1].maxValue())
+ .isEqualTo(StringData.fromString("400"));
+ assertThat(statsArray[2].minValue())
+ .isEqualTo(statsArray[2].maxValue())
+ .isEqualTo(StringData.fromString("401"));
+ assertThat((Double) statsArray[3].minValue())
+ .isEqualTo((Double) statsArray[3].maxValue())
+ .isEqualTo(402D);
+ assertThat((Integer) statsArray[4].minValue())
+ .isEqualTo(statsArray[4].maxValue())
+ .isEqualTo(403);
+ assertThat(((DecimalData) statsArray[5].minValue()).toBigDecimal().intValue())
+ .isEqualTo(((DecimalData) statsArray[5].maxValue()).toBigDecimal().intValue())
+ .isEqualTo(404);
+ assertThat((Float) statsArray[6].minValue())
+ .isEqualTo(statsArray[6].maxValue())
+ .isEqualTo(405F);
+ assertThat((Double) statsArray[7].minValue())
+ .isEqualTo(statsArray[7].maxValue())
+ .isEqualTo(406D);
+ assertThat(((DecimalData) statsArray[8].minValue()).toBigDecimal().doubleValue())
+ .isEqualTo(((DecimalData) statsArray[8].maxValue()).toBigDecimal().doubleValue())
+ .isEqualTo(407D);
+ assertThat(statsArray[9].minValue()).isEqualTo(statsArray[9].maxValue()).isEqualTo(408);
+ assertThat(statsArray[10].minValue()).isEqualTo(statsArray[10].maxValue()).isEqualTo(409);
+
+ // Min and max value of binary type is null
+ assertThat(statsArray[11].minValue()).isNull();
+ assertThat(statsArray[11].maxValue()).isNull();
+ }
+
+ /**
+ * Check file with new types and data.
+ *
+ * <ul>
+ * <li>data1: 2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209 *
+ * millsPerDay),toBytes("210")
+ * <li>data2: 2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309 *
+ * millsPerDay),toBytes("310")
+ * <li>old types: a->int, b->char[10], c->varchar[10], d->decimal, e->smallint, f->int,
+ * g->bigint, h->float, i->double, j->date, k->timestamp, l->binary
+ * <li>new types: a->int, b->varchar[10], c->varchar[10], d->double, e->int,
+ * f->decimal,g->float, h->double, i->decimal, j->date, k->date, l->varbinary
+ * </ul>
+ */
+ private void checkTwoValues(FieldStats[] statsArray) {
+ assertThat(statsArray[0].minValue()).isEqualTo(2);
+ assertThat(statsArray[0].maxValue()).isEqualTo(2);
+
+ assertThat(statsArray[1].minValue()).isEqualTo(StringData.fromString("200 "));
+ assertThat(statsArray[1].maxValue()).isEqualTo(StringData.fromString("300 "));
+
+ assertThat(statsArray[2].minValue()).isEqualTo(StringData.fromString("201"));
+ assertThat(statsArray[2].maxValue()).isEqualTo(StringData.fromString("301"));
+
+ assertThat((Double) statsArray[3].minValue()).isEqualTo(202D);
+ assertThat((Double) statsArray[3].maxValue()).isEqualTo(302D);
+
+ assertThat((Integer) statsArray[4].minValue()).isEqualTo(203);
+ assertThat((Integer) statsArray[4].maxValue()).isEqualTo(303);
+
+ assertThat(((DecimalData) statsArray[5].minValue()).toBigDecimal().intValue())
+ .isEqualTo(204);
+ assertThat(((DecimalData) statsArray[5].maxValue()).toBigDecimal().intValue())
+ .isEqualTo(304);
+
+ assertThat((Float) statsArray[6].minValue()).isEqualTo(205F);
+ assertThat((Float) statsArray[6].maxValue()).isEqualTo(305F);
+
+ assertThat((Double) statsArray[7].minValue()).isEqualTo(206D);
+ assertThat((Double) statsArray[7].maxValue()).isEqualTo(306D);
+
+ assertThat(((DecimalData) statsArray[8].minValue()).toBigDecimal().doubleValue())
+ .isEqualTo(207D);
+ assertThat(((DecimalData) statsArray[8].maxValue()).toBigDecimal().doubleValue())
+ .isEqualTo(307D);
+
+ assertThat(statsArray[9].minValue()).isEqualTo(208);
+ assertThat(statsArray[9].maxValue()).isEqualTo(308);
+ assertThat(statsArray[10].minValue()).isEqualTo(209);
+ assertThat(statsArray[10].maxValue()).isEqualTo(309);
+
+ // Min and max value of binary type is null
+ assertThat(statsArray[11].minValue()).isNull();
+ assertThat(statsArray[11].maxValue()).isNull();
+ }
+
+ @Override
+ protected List<String> getPrimaryKeyNames() {
+ return SCHEMA_PRIMARY_KEYS;
+ }
+
+ protected abstract BinaryTableStats getTableValueStats(DataFileMeta fileMeta);
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
index b2f7215d..4a839e9e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
@@ -315,17 +315,4 @@ public abstract class FileMetaFilterTestBase extends SchemaEvolutionTableTestBas
table.newScan().withFilter(builder.equal(index, value)).plan();
checkFilterRowCount(plan, expectedRowCount);
}
-
- protected static void checkFilterRowCount(
- DataTableScan.DataFilePlan plan, long expectedRowCount) {
- List<DataFileMeta> fileMetaList =
- plan.splits.stream().flatMap(s -> s.files().stream()).collect(Collectors.toList());
- checkFilterRowCount(fileMetaList, expectedRowCount);
- }
-
- protected static void checkFilterRowCount(
- List<DataFileMeta> fileMetaList, long expectedRowCount) {
- assertThat(fileMetaList.stream().mapToLong(DataFileMeta::rowCount).sum())
- .isEqualTo(expectedRowCount);
- }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java
index 145b0846..ff3bdf0a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java
@@ -22,10 +22,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.schema.AtomicDataType;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.SchemaChange;
@@ -36,12 +39,14 @@ import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.TraceableFileSystem;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -52,6 +57,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
@@ -77,6 +83,29 @@ public abstract class SchemaEvolutionTableTestBase {
protected static final List<String> PARTITION_NAMES = Collections.singletonList("pt");
protected static final List<String> PRIMARY_KEY_NAMES = Arrays.asList("pt", "kt");
+ protected static final List<DataField> SCHEMA_FIELDS =
+ Arrays.asList(
+ new DataField(0, "a", new AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(1, "b", new AtomicDataType(DataTypes.CHAR(10).getLogicalType())),
+ new DataField(
+ 2, "c", new AtomicDataType(DataTypes.VARCHAR(10).getLogicalType())),
+ new DataField(
+ 3, "d", new AtomicDataType(DataTypes.DECIMAL(10, 2).getLogicalType())),
+ new DataField(
+ 4, "e", new AtomicDataType(DataTypes.SMALLINT().getLogicalType())),
+ new DataField(5, "f", new AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(6, "g", new AtomicDataType(DataTypes.BIGINT().getLogicalType())),
+ new DataField(7, "h", new AtomicDataType(DataTypes.FLOAT().getLogicalType())),
+ new DataField(8, "i", new AtomicDataType(DataTypes.DOUBLE().getLogicalType())),
+ new DataField(9, "j", new AtomicDataType(DataTypes.DATE().getLogicalType())),
+ new DataField(
+ 10, "k", new AtomicDataType(DataTypes.TIMESTAMP(2).getLogicalType())),
+ new DataField(
+ 11, "l", new AtomicDataType(DataTypes.BINARY(100).getLogicalType())));
+ protected static final List<String> SCHEMA_PARTITION_NAMES = Collections.singletonList("a");
+ protected static final List<String> SCHEMA_PRIMARY_KEYS =
+ Arrays.asList("a", "b", "c", "d", "e");
+
protected Path tablePath;
protected String commitUser;
protected final Configuration tableConfig = new Configuration();
@@ -185,6 +214,215 @@ public abstract class SchemaEvolutionTableTestBase {
secondChecker.accept(result, tableSchemas);
}
+ public static <R> void writeAndCheckFileResultForColumnType(
+ Function<Map<Long, TableSchema>, R> firstChecker,
+ BiConsumer<R, Map<Long, TableSchema>> secondChecker,
+ List<String> primaryKeyNames,
+ Configuration tableConfig,
+ Function<Map<Long, TableSchema>, FileStoreTable> createFileStoreTable)
+ throws Exception {
+ Map<Long, TableSchema> tableSchemas = new HashMap<>();
+ // Create schema with SCHEMA_FIELDS
+ tableSchemas.put(
+ 0L,
+ new TableSchema(
+ 0,
+ SCHEMA_FIELDS,
+ 12,
+ SCHEMA_PARTITION_NAMES,
+ primaryKeyNames,
+ tableConfig.toMap(),
+ ""));
+ FileStoreTable table = createFileStoreTable.apply(tableSchemas);
+ TableWrite write = table.newWrite("user");
+ TableCommit commit = table.newCommit("user");
+
+ /**
+ * Generate two files:
+ *
+ * <ul>
+ * <li>file1 with one data: 1,"100","101",(102),short)
+ * 103,104,105L,106F,107D,108,toTimestamp(109 * millsPerDay),"110".getBytes()
+ * <li>file2 with two data:
+ * <ul>
+ * <li>2,"200","201",toDecimal(202),(short)
+ * 203,204,205L,206F,207D,208,toTimestamp(209 * millsPerDay),toBytes("210")
+ * <li>2,"300","301",toDecimal(302),(short)
+ * 303,304,305L,306F,307D,308,toTimestamp(309 * millsPerDay),toBytes("310")
+ * </ul>
+ * </ul>
+ */
+ final long millsPerDay = 86400000;
+ write.write(
+ rowData(
+ 1,
+ "100",
+ "101",
+ toDecimal(102),
+ (short) 103,
+ 104,
+ 105L,
+ 106F,
+ 107D,
+ 108,
+ toTimestamp(109 * millsPerDay),
+ "110".getBytes()));
+ write.write(
+ rowData(
+ 2,
+ "200",
+ "201",
+ toDecimal(202),
+ (short) 203,
+ 204,
+ 205L,
+ 206F,
+ 207D,
+ 208,
+ toTimestamp(209 * millsPerDay),
+ toBytes("210")));
+ write.write(
+ rowData(
+ 2,
+ "300",
+ "301",
+ toDecimal(302),
+ (short) 303,
+ 304,
+ 305L,
+ 306F,
+ 307D,
+ 308,
+ toTimestamp(309 * millsPerDay),
+ toBytes("310")));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ R result = firstChecker.apply(tableSchemas);
+
+ /**
+ * Fields before and after column type evolution:
+ *
+ * <ul>
+ * <li>Before: a->string, b->char[10], c->varchar[10], d->decimal, e->smallint, f->int,
+ * g->bigint, h->float, i->double, j->date, k->timestamp, l->binary
+ * <li>After: a->string, b->varchar[10], c->varchar[10], d->double, e->int, f->decimal,
+ * g->float, h->double, i->decimal, j->date, k->date, l->varbinary
+ * </ul>
+ */
+ //
+ List<DataField> evolutionFields = new ArrayList<>(SCHEMA_FIELDS);
+ evolutionFields.set(
+ 1,
+ new DataField(1, "b", new AtomicDataType(DataTypes.VARCHAR(10).getLogicalType())));
+ evolutionFields.set(
+ 3, new DataField(3, "d", new AtomicDataType(DataTypes.DOUBLE().getLogicalType())));
+ evolutionFields.set(
+ 4, new DataField(4, "e", new AtomicDataType(DataTypes.INT().getLogicalType())));
+ evolutionFields.set(
+ 5,
+ new DataField(
+ 5, "f", new AtomicDataType(DataTypes.DECIMAL(10, 2).getLogicalType())));
+ evolutionFields.set(
+ 6, new DataField(6, "g", new AtomicDataType(DataTypes.FLOAT().getLogicalType())));
+ evolutionFields.set(
+ 7, new DataField(7, "h", new AtomicDataType(DataTypes.DOUBLE().getLogicalType())));
+ evolutionFields.set(
+ 8,
+ new DataField(
+ 8, "i", new AtomicDataType(DataTypes.DECIMAL(10, 2).getLogicalType())));
+ evolutionFields.set(
+ 10, new DataField(10, "k", new AtomicDataType(DataTypes.DATE().getLogicalType())));
+ evolutionFields.set(
+ 11,
+ new DataField(
+ 11, "l", new AtomicDataType(DataTypes.VARBINARY(100).getLogicalType())));
+ tableSchemas.put(
+ 1L,
+ new TableSchema(
+ 1,
+ evolutionFields,
+ 12,
+ SCHEMA_PARTITION_NAMES,
+ primaryKeyNames,
+ tableConfig.toMap(),
+ ""));
+ table = createFileStoreTable.apply(tableSchemas);
+ write = table.newWrite("user");
+ commit = table.newCommit("user");
+
+ /**
+ * Generate another two files:
+ *
+ * <ul>
+ * <li>file1 with one data:
+ * 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
+ * <li>file2 with two data:
+ * <ul>
+ * <li>1,"500","501",502D,503,toDecimal(504),505F,506D,toDecimal(507),508,509,toBytes("510")
+ * <li>1,"600","601",602D,603,toDecimal(604),605F,606D,toDecimal(607),608,609,toBytes("610")
+ * </ul>
+ * </ul>
+ */
+ write.write(
+ rowData(
+ 2,
+ "400",
+ "401",
+ 402D,
+ 403,
+ toDecimal(404),
+ 405F,
+ 406D,
+ toDecimal(407),
+ 408,
+ 409,
+ toBytes("410")));
+ write.write(
+ rowData(
+ 1,
+ "500",
+ "501",
+ 502D,
+ 503,
+ toDecimal(504),
+ 505F,
+ 506D,
+ toDecimal(507),
+ 508,
+ 509,
+ toBytes("510")));
+ write.write(
+ rowData(
+ 1,
+ "600",
+ "601",
+ 602D,
+ 603,
+ toDecimal(604),
+ 605F,
+ 606D,
+ toDecimal(607),
+ 608,
+ 609,
+ toBytes("610")));
+ commit.commit(1, write.prepareCommit(true, 1));
+ write.close();
+
+ secondChecker.accept(result, tableSchemas);
+ }
+
+ private static DecimalData toDecimal(int val) {
+ return DecimalData.fromBigDecimal(new BigDecimal(val), 10, 2);
+ }
+
+ private static TimestampData toTimestamp(long mills) {
+ return TimestampData.fromEpochMillis(mills);
+ }
+
+ private static byte[] toBytes(String val) {
+ return val.getBytes();
+ }
+
protected static RowData rowData(Object... values) {
List<Object> valueList = new ArrayList<>(values.length);
for (Object value : values) {
@@ -197,6 +435,19 @@ public abstract class SchemaEvolutionTableTestBase {
return GenericRowData.of(valueList.toArray(new Object[0]));
}
+ protected static void checkFilterRowCount(
+ DataTableScan.DataFilePlan plan, long expectedRowCount) {
+ List<DataFileMeta> fileMetaList =
+ plan.splits.stream().flatMap(s -> s.files().stream()).collect(Collectors.toList());
+ checkFilterRowCount(fileMetaList, expectedRowCount);
+ }
+
+ protected static void checkFilterRowCount(
+ List<DataFileMeta> fileMetaList, long expectedRowCount) {
+ assertThat(fileMetaList.stream().mapToLong(DataFileMeta::rowCount).sum())
+ .isEqualTo(expectedRowCount);
+ }
+
/** {@link SchemaManager} subclass for testing. */
public static class TestingSchemaManager extends SchemaManager {
private final Map<Long, TableSchema> tableSchemas;