You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/05 10:01:04 UTC

[flink-table-store] branch master updated: [FLINK-27844] Support column type evolution in file meta

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new d031fd8e [FLINK-27844] Support column type evolution in file meta
d031fd8e is described below

commit d031fd8e977ac754d78d256dd276b6dae595e723
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Jan 5 18:00:58 2023 +0800

    [FLINK-27844] Support column type evolution in file meta
    
    This closes #423
---
 .../table/store/file/casting/CastExecutors.java    |  31 +-
 .../file/operation/AppendOnlyFileStoreScan.java    |  17 +-
 .../file/operation/KeyValueFileStoreScan.java      |  18 +-
 .../flink/table/store/file/schema/DataType.java    |  13 +
 .../store/file/schema/SchemaEvolutionUtil.java     |  56 ++++
 .../table/store/file/schema/SchemaManager.java     |  14 +-
 .../file/stats/FieldStatsArraySerializer.java      |  22 +-
 .../table/store/file/casting/CastExecutorTest.java |  12 +
 .../file/stats/FieldStatsArraySerializerTest.java  |  11 +-
 .../AppendOnlyTableColumnTypeFileMetaTest.java     |  56 ++++
 .../ChangelogValueCountColumnTypeFileMetaTest.java |  63 ++++
 ...hangelogWithKeyTableColumnTypeFileMetaTest.java | 154 ++++++++++
 .../store/table/ColumnTypeFileMetaTestBase.java    | 341 +++++++++++++++++++++
 .../table/store/table/FileMetaFilterTestBase.java  |  13 -
 .../store/table/SchemaEvolutionTableTestBase.java  | 251 +++++++++++++++
 15 files changed, 1036 insertions(+), 36 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
index d46c68fc..28e41b1b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
@@ -45,6 +45,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
 
 /** Cast executors for input type and output type. */
 public class CastExecutors {
+    private static final CastExecutor<?, ?> IDENTITY_CAST_EXECUTOR = value -> value;
 
     /**
      * Resolve a {@link CastExecutor} for the provided input type and target type. Returns null if
@@ -176,12 +177,28 @@ public class CastExecutors {
                 }
                 return null;
             case BINARY:
-                if (outputType.getTypeRoot() == BINARY) {
+                if (outputType.getTypeRoot() == BINARY || outputType.getTypeRoot() == VARBINARY) {
+                    boolean targetBinaryType = outputType.getTypeRoot() == BINARY;
                     final int targetLength = getBinaryLength(outputType);
-                    return value ->
-                            ((((byte[]) value).length == targetLength)
-                                    ? value
-                                    : Arrays.copyOf((byte[]) value, targetLength));
+                    return value -> {
+                        byte[] bytes = (byte[]) value;
+                        if (((byte[]) value).length == targetLength) {
+                            return value;
+                        }
+                        if (targetBinaryType) {
+                            if (bytes.length == targetLength) {
+                                return bytes;
+                            } else {
+                                return Arrays.copyOf(bytes, targetLength);
+                            }
+                        } else {
+                            if (bytes.length <= targetLength) {
+                                return bytes;
+                            } else {
+                                return Arrays.copyOf(bytes, targetLength);
+                            }
+                        }
+                    };
                 }
                 return null;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
@@ -225,6 +242,10 @@ public class CastExecutors {
         }
     }
 
+    public static CastExecutor<?, ?> identityCastExecutor() {
+        return IDENTITY_CAST_EXECUTOR;
+    }
+
     private static int getStringLength(LogicalType logicalType) {
         if (logicalType instanceof CharType) {
             return ((CharType) logicalType).getLength();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index ac65b0ab..a63363fb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.store.file.casting.CastExecutor;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -103,13 +105,22 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
                 schemaId,
                 id -> {
                     TableSchema tableSchema = scanTableSchema();
+                    List<DataField> tableFields = tableSchema.fields();
                     TableSchema schema = scanTableSchema(id);
-                    return new FieldStatsArraySerializer(
-                            schema.logicalRowType(),
+                    List<DataField> dataFields = schema.fields();
+                    int[] indexMapping =
                             tableSchema.id() == id
                                     ? null
                                     : SchemaEvolutionUtil.createIndexMapping(
-                                            tableSchema.fields(), schema.fields()));
+                                            tableFields, dataFields);
+                    CastExecutor<Object, Object>[] converterMapping =
+                            tableSchema.id() == id
+                                    ? null
+                                    : (CastExecutor<Object, Object>[])
+                                            SchemaEvolutionUtil.createConvertMapping(
+                                                    tableFields, dataFields, indexMapping);
+                    return new FieldStatsArraySerializer(
+                            schema.logicalRowType(), indexMapping, converterMapping);
                 });
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index 25391021..ee828dda 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.store.file.casting.CastExecutor;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
@@ -110,14 +111,23 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
                 key -> {
                     final TableSchema tableSchema = scanTableSchema();
                     final TableSchema schema = scanTableSchema(key);
+                    final List<DataField> tableKeyFields =
+                            keyValueFieldsExtractor.keyFields(tableSchema);
                     final List<DataField> keyFields = keyValueFieldsExtractor.keyFields(schema);
-                    return new FieldStatsArraySerializer(
-                            RowDataType.toRowType(false, keyFields),
+                    final int[] indexMapping =
                             tableSchema.id() == key
                                     ? null
                                     : SchemaEvolutionUtil.createIndexMapping(
-                                            keyValueFieldsExtractor.keyFields(tableSchema),
-                                            keyFields));
+                                            tableKeyFields, keyFields);
+                    final CastExecutor<?, ?>[] converterMapping =
+                            tableSchema.id() == key
+                                    ? null
+                                    : SchemaEvolutionUtil.createConvertMapping(
+                                            tableKeyFields, keyFields, indexMapping);
+                    return new FieldStatsArraySerializer(
+                            RowDataType.toRowType(false, keyFields),
+                            indexMapping,
+                            (CastExecutor<Object, Object>[]) converterMapping);
                 });
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
index 83bfff0b..e3a6f6e4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.file.schema;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -53,6 +55,17 @@ public abstract class DataType implements Serializable {
      */
     public abstract DataType copy(boolean isNullable);
 
+    /**
+     * Compare two data types without nullable.
+     *
+     * @param o the target data type
+     */
+    public boolean equalsIgnoreNullable(@Nonnull DataType o) {
+        LogicalType sourceType = logicalType.copy(true);
+        LogicalType targetType = o.logicalType.copy(true);
+        return Objects.equals(sourceType, targetType);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
index 820ce6d4..ccb6e9b4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.schema;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.casting.CastExecutor;
+import org.apache.flink.table.store.file.casting.CastExecutors;
 import org.apache.flink.table.store.file.predicate.LeafPredicate;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateReplaceVisitor;
@@ -37,6 +39,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** Utils for schema evolution. */
 public class SchemaEvolutionUtil {
@@ -304,4 +307,57 @@ public class SchemaEvolutionUtil {
         throw new IllegalArgumentException(
                 String.format("Can't find data field %s", dataField.name()));
     }
+
+    /**
+     * Create converter mapping from table fields to underlying data fields. For example, the table
+     * and data fields are as follows
+     *
+     * <ul>
+     *   <li>table fields: 1->c INT, 6->b STRING, 3->a BIGINT
+     *   <li>data fields: 1->a BIGINT, 3->c DOUBLE
+     * </ul>
+     *
+     * <p>We can get the column types (1->a BIGINT), (3->c DOUBLE) from data fields for (1->c INT)
+     * and (3->a BIGINT) in table fields through index mapping [0, -1, 1], then compare the data
+     * type and create converter mapping.
+     *
+     * <p>/// TODO should support nest index mapping when nest schema evolution is supported.
+     *
+     * @param tableFields the fields of table
+     * @param dataFields the fields of underlying data
+     * @param indexMapping the index mapping from table fields to data fields
+     * @return the index mapping
+     */
+    @Nullable
+    public static CastExecutor<?, ?>[] createConvertMapping(
+            List<DataField> tableFields, List<DataField> dataFields, int[] indexMapping) {
+        CastExecutor<?, ?>[] converterMapping = new CastExecutor<?, ?>[tableFields.size()];
+        boolean castExist = false;
+        for (int i = 0; i < tableFields.size(); i++) {
+            int dataIndex = indexMapping == null ? i : indexMapping[i];
+            if (dataIndex < 0) {
+                converterMapping[i] = CastExecutors.identityCastExecutor();
+            } else {
+                DataField tableField = tableFields.get(i);
+                DataField dataField = dataFields.get(dataIndex);
+                if (dataField.type().equalsIgnoreNullable(tableField.type())) {
+                    converterMapping[i] = CastExecutors.identityCastExecutor();
+                } else {
+                    // TODO support column type evolution in nested type
+                    checkState(
+                            tableField.type() instanceof AtomicDataType
+                                    && dataField.type() instanceof AtomicDataType,
+                            "Only support column type evolution in atomic data type.");
+                    converterMapping[i] =
+                            checkNotNull(
+                                    CastExecutors.resolve(
+                                            dataField.type().logicalType(),
+                                            tableField.type().logicalType()));
+                    castExist = true;
+                }
+            }
+        }
+
+        return castExist ? converterMapping : null;
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 9946e703..750c21fb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.casting.CastExecutors;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn;
 import org.apache.flink.table.store.file.schema.SchemaChange.DropColumn;
@@ -255,13 +256,24 @@ public class SchemaManager implements Serializable {
                     }
                 } else if (change instanceof UpdateColumnType) {
                     UpdateColumnType update = (UpdateColumnType) change;
+                    if (schema.partitionKeys().contains(update.fieldName())) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Cannot update partition column [%s] type in the table[%s].",
+                                        update.fieldName(), tableRoot));
+                    }
                     updateColumn(
                             newFields,
                             update.fieldName(),
                             (field) -> {
                                 checkState(
                                         LogicalTypeCasts.supportsImplicitCast(
-                                                field.type().logicalType, update.newLogicalType()),
+                                                        field.type().logicalType,
+                                                        update.newLogicalType())
+                                                && CastExecutors.resolve(
+                                                                field.type().logicalType,
+                                                                update.newLogicalType())
+                                                        != null,
                                         String.format(
                                                 "Column type %s[%s] cannot be converted to %s without loosing information.",
                                                 field.name(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 3ce0bcf5..7e02e5bf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.stats;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.casting.CastExecutor;
 import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.ArrayType;
@@ -44,12 +45,14 @@ public class FieldStatsArraySerializer {
     private final RowData.FieldGetter[] fieldGetters;
 
     @Nullable private final int[] indexMapping;
+    @Nullable private final CastExecutor<Object, Object>[] converterMapping;
 
     public FieldStatsArraySerializer(RowType type) {
-        this(type, null);
+        this(type, null, null);
     }
 
-    public FieldStatsArraySerializer(RowType type, int[] indexMapping) {
+    public FieldStatsArraySerializer(
+            RowType type, int[] indexMapping, CastExecutor<Object, Object>[] converterMapping) {
         RowType safeType = toAllFieldsNullableRowType(type);
         this.serializer = new RowDataSerializer(safeType);
         this.fieldGetters =
@@ -60,6 +63,7 @@ public class FieldStatsArraySerializer {
                                                 safeType.getTypeAt(i), i))
                         .toArray(RowData.FieldGetter[]::new);
         this.indexMapping = indexMapping;
+        this.converterMapping = converterMapping;
     }
 
     public BinaryTableStats toBinary(FieldStats[] stats) {
@@ -95,11 +99,15 @@ public class FieldStatsArraySerializer {
                 }
                 stats[i] = new FieldStats(null, null, rowCount);
             } else {
-                stats[i] =
-                        new FieldStats(
-                                fieldGetters[fieldIndex].getFieldOrNull(array.min()),
-                                fieldGetters[fieldIndex].getFieldOrNull(array.max()),
-                                array.nullCounts()[fieldIndex]);
+                CastExecutor<Object, Object> converter =
+                        converterMapping == null ? null : converterMapping[i];
+                Object min = fieldGetters[fieldIndex].getFieldOrNull(array.min());
+                min = converter == null || min == null ? min : converter.cast(min);
+
+                Object max = fieldGetters[fieldIndex].getFieldOrNull(array.max());
+                max = converter == null || max == null ? max : converter.cast(max);
+
+                stats[i] = new FieldStats(min, max, array.nullCounts()[fieldIndex]);
             }
         }
         return stats;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
index ea5ffeeb..70d06086 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
@@ -223,6 +223,18 @@ public class CastExecutorTest {
                 CastExecutors.resolve(new BinaryType(10), new BinaryType(20)),
                 "12345678".getBytes(),
                 new byte[] {49, 50, 51, 52, 53, 54, 55, 56, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
+
+        // binary(10) to varbinary(5)
+        compareCastResult(
+                CastExecutors.resolve(new BinaryType(10), new VarBinaryType(5)),
+                "1234567890".getBytes(),
+                "12345".getBytes());
+
+        // binary(10) to varbinary(20)
+        compareCastResult(
+                CastExecutors.resolve(new BinaryType(10), new VarBinaryType(20)),
+                "12345678".getBytes(),
+                "12345678".getBytes());
     }
 
     @Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
index 2e63eea5..3990bc6d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.stats;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.casting.CastExecutor;
 import org.apache.flink.table.store.file.schema.AtomicDataType;
 import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
@@ -93,11 +94,15 @@ public class FieldStatsArraySerializerTest {
                         Collections.EMPTY_MAP,
                         "");
 
+        int[] indexMapping =
+                SchemaEvolutionUtil.createIndexMapping(tableSchema.fields(), dataSchema.fields());
+        CastExecutor<Object, Object>[] converterMapping =
+                (CastExecutor<Object, Object>[])
+                        SchemaEvolutionUtil.createConvertMapping(
+                                tableSchema.fields(), dataSchema.fields(), indexMapping);
         FieldStatsArraySerializer fieldStatsArraySerializer =
                 new FieldStatsArraySerializer(
-                        tableSchema.logicalRowType(),
-                        SchemaEvolutionUtil.createIndexMapping(
-                                tableSchema.fields(), dataSchema.fields()));
+                        tableSchema.logicalRowType(), indexMapping, converterMapping);
         BinaryRowData minRowData = row(1, 2, 3, 4);
         BinaryRowData maxRowData = row(100, 99, 98, 97);
         long[] nullCounts = new long[] {1, 0, 10, 100};
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableColumnTypeFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableColumnTypeFileMetaTest.java
new file mode 100644
index 00000000..87cf9336
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableColumnTypeFileMetaTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Map;
+
+/** File meta tests for column type evolution in {@link AppendOnlyFileStoreTable}. */
+public class AppendOnlyTableColumnTypeFileMetaTest extends ColumnTypeFileMetaTestBase {
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> tableSchemas) {
+        SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas);
+        return new AppendOnlyFileStoreTable(tablePath, schemaManager.latest().get()) {
+            @Override
+            protected SchemaManager schemaManager() {
+                return schemaManager;
+            }
+        };
+    }
+
+    @Override
+    protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+        return fileMeta.valueStats();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountColumnTypeFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountColumnTypeFileMetaTest.java
new file mode 100644
index 00000000..1eec700e
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountColumnTypeFileMetaTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** File meta tests for column type evolution in {@link ChangelogValueCountFileStoreTable}. */
+public class ChangelogValueCountColumnTypeFileMetaTest extends ColumnTypeFileMetaTestBase {
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+    }
+
+    @Override
+    protected List<String> getPrimaryKeyNames() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> tableSchemas) {
+        SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas);
+        return new ChangelogValueCountFileStoreTable(tablePath, schemaManager.latest().get()) {
+            @Override
+            protected SchemaManager schemaManager() {
+                return schemaManager;
+            }
+        };
+    }
+
+    @Override
+    protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+        return fileMeta.keyStats();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
new file mode 100644
index 00000000..d7d7f19e
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** File meta tests for column type evolution in {@link ChangelogWithKeyFileStoreTable}. */
+public class ChangelogWithKeyTableColumnTypeFileMetaTest extends ColumnTypeFileMetaTestBase {
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> tableSchemas) {
+        SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas);
+        return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager.latest().get()) {
+            @Override
+            protected SchemaManager schemaManager() {
+                return schemaManager;
+            }
+        };
+    }
+
+    @Override
+    protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+        return fileMeta.keyStats();
+    }
+
+    /** We can only validate field stats of primary keys in changelog with key table. */
+    @Override
+    protected void validateStatsField(List<DataFileMeta> fileMetaList) {
+        for (DataFileMeta fileMeta : fileMetaList) {
+            FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+            assertThat(statsArray.length).isEqualTo(4);
+            for (int i = 0; i < 4; i++) {
+                assertThat(statsArray[i].minValue()).isNotNull();
+                assertThat(statsArray[i].maxValue()).isNotNull();
+            }
+        }
+    }
+
+    @Override
+    @Test
+    public void testTableScanFilterNormalFields() throws Exception {
+        writeAndCheckFileResultForColumnType(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    /**
+                     * Changelog with key table doesn't support filter in value, it will scan all
+                     * data. TODO support filter value in future.
+                     */
+                    Predicate predicate =
+                            new PredicateBuilder(table.schema().logicalRowType())
+                                    .between(6, 200L, 500L);
+                    DataTableScan.DataFilePlan plan = table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 3L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+
+                    /**
+                     * Changelog with key table doesn't support filter in value, it will scan all
+                     * data. TODO support filter value in future.
+                     */
+                    DataTableScan.DataFilePlan plan =
+                            table.newScan()
+                                    .withFilter(
+                                            new PredicateBuilder(table.schema().logicalRowType())
+                                                    .between(6, 200F, 500F))
+                                    .plan();
+                    checkFilterRowCount(plan, 6L);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    /** We can only validate the values in primary keys for changelog with key table. */
+    @Override
+    protected void validateValuesWithNewSchema(
+            List<String> filesName, List<DataFileMeta> fileMetaList) {
+        for (DataFileMeta fileMeta : fileMetaList) {
+            FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+            assertThat(statsArray.length).isEqualTo(4);
+            if (filesName.contains(fileMeta.fileName())) {
+                assertThat(statsArray[0].minValue()).isEqualTo(StringData.fromString("200       "));
+                assertThat(statsArray[0].maxValue()).isEqualTo(StringData.fromString("300       "));
+
+                assertThat(statsArray[1].minValue()).isEqualTo(StringData.fromString("201"));
+                assertThat(statsArray[1].maxValue()).isEqualTo(StringData.fromString("301"));
+
+                assertThat((Double) statsArray[2].minValue()).isEqualTo(202D);
+                assertThat((Double) statsArray[2].maxValue()).isEqualTo(302D);
+
+                assertThat((Integer) statsArray[3].minValue()).isEqualTo(203);
+                assertThat((Integer) statsArray[3].maxValue()).isEqualTo(303);
+            } else {
+                assertThat(statsArray[0].minValue())
+                        .isEqualTo(statsArray[0].maxValue())
+                        .isEqualTo(StringData.fromString("400"));
+                assertThat(statsArray[1].minValue())
+                        .isEqualTo(statsArray[1].maxValue())
+                        .isEqualTo(StringData.fromString("401"));
+                assertThat((Double) statsArray[2].minValue())
+                        .isEqualTo((Double) statsArray[2].maxValue())
+                        .isEqualTo(402D);
+                assertThat((Integer) statsArray[3].minValue())
+                        .isEqualTo(statsArray[3].maxValue())
+                        .isEqualTo(403);
+            }
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ColumnTypeFileMetaTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ColumnTypeFileMetaTestBase.java
new file mode 100644
index 00000000..54836847
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ColumnTypeFileMetaTestBase.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for column type evolution. */
+public abstract class ColumnTypeFileMetaTestBase extends SchemaEvolutionTableTestBase {
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        tableConfig.set(CoreOptions.BUCKET, 1);
+    }
+
+    @Test
+    public void testTableScan() throws Exception {
+        writeAndCheckFileResultForColumnType(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    DataTableScan.DataFilePlan plan = table.newScan().plan();
+                    checkFilterRowCount(plan, 3L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    // Scan all data files
+                    DataTableScan.DataFilePlan plan =
+                            table.newScan()
+                                    .withFilter(
+                                            new PredicateBuilder(table.schema().logicalRowType())
+                                                    .greaterOrEqual(1, StringData.fromString("0")))
+                                    .plan();
+                    checkFilterRowCount(plan, 6L);
+
+                    List<String> filesName =
+                            files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+                    assertThat(filesName.size()).isGreaterThan(0);
+
+                    List<DataFileMeta> fileMetaList =
+                            plan.splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    assertThat(
+                                    fileMetaList.stream()
+                                            .map(DataFileMeta::fileName)
+                                            .collect(Collectors.toList()))
+                            .containsAll(filesName);
+
+                    validateStatsField(fileMetaList);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    protected void validateStatsField(List<DataFileMeta> fileMetaList) {
+        for (DataFileMeta fileMeta : fileMetaList) {
+            FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+            assertThat(statsArray.length).isEqualTo(12);
+            for (int i = 0; i < 11; i++) {
+                assertThat(statsArray[i].minValue()).isNotNull();
+                assertThat(statsArray[i].maxValue()).isNotNull();
+            }
+            // Min and max value of binary type is null
+            assertThat(statsArray[11].minValue()).isNull();
+            assertThat(statsArray[11].maxValue()).isNull();
+        }
+    }
+
+    @Test
+    public void testTableScanFilterNormalFields() throws Exception {
+        writeAndCheckFileResultForColumnType(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    /**
+                     * Filter field "g" in [200, 500] in SCHEMA_FIELDS which is bigint and will get
+                     * one file with two data as followed:
+                     *
+                     * <ul>
+                     *   <li>2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209
+                     *       * millsPerDay),toBytes("210")
+                     *   <li>2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309
+                     *       * millsPerDay),toBytes("310")
+                     * </ul>
+                     */
+                    Predicate predicate =
+                            new PredicateBuilder(table.schema().logicalRowType())
+                                    .between(6, 200L, 500L);
+                    DataTableScan.DataFilePlan plan = table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 2L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+
+                    /**
+                     * Filter field "g" in [200, 500] in SCHEMA_FIELDS which is updated from bigint
+                     * to float and will get another file with one data as followed:
+                     *
+                     * <ul>
+                     *   <li>2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
+                     * </ul>
+                     *
+                     * <p>Then we can check the results of the two result files.
+                     */
+                    DataTableScan.DataFilePlan plan =
+                            table.newScan()
+                                    .withFilter(
+                                            new PredicateBuilder(table.schema().logicalRowType())
+                                                    .between(6, 200F, 500F))
+                                    .plan();
+                    checkFilterRowCount(plan, 3L);
+
+                    List<String> filesName =
+                            files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+                    assertThat(filesName.size()).isGreaterThan(0);
+
+                    List<DataFileMeta> fileMetaList =
+                            plan.splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    assertThat(
+                                    fileMetaList.stream()
+                                            .map(DataFileMeta::fileName)
+                                            .collect(Collectors.toList()))
+                            .containsAll(filesName);
+
+                    validateValuesWithNewSchema(filesName, fileMetaList);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Test
+    public void testTableScanFilterPrimaryKeyFields() throws Exception {
+        writeAndCheckFileResultForColumnType(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    // results of field "e" in [200, 500] in SCHEMA_FIELDS which is bigint
+                    Predicate predicate =
+                            new PredicateBuilder(table.schema().logicalRowType())
+                                    .between(4, (short) 200, (short) 500);
+                    DataTableScan.DataFilePlan plan = table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 2L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    // results of field "e" in [200, 500] in SCHEMA_FIELDS which is updated from
+                    // bigint to int
+                    DataTableScan.DataFilePlan plan =
+                            table.newScan()
+                                    .withFilter(
+                                            new PredicateBuilder(table.schema().logicalRowType())
+                                                    .between(4, 200, 500))
+                                    .plan();
+                    checkFilterRowCount(plan, 3L);
+
+                    List<String> filesName =
+                            files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+                    assertThat(filesName.size()).isGreaterThan(0);
+
+                    List<DataFileMeta> fileMetaList =
+                            plan.splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    assertThat(
+                                    fileMetaList.stream()
+                                            .map(DataFileMeta::fileName)
+                                            .collect(Collectors.toList()))
+                            .containsAll(filesName);
+
+                    // Compare all columns with table column type
+                    validateValuesWithNewSchema(filesName, fileMetaList);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    protected void validateValuesWithNewSchema(
+            List<String> filesName, List<DataFileMeta> fileMetaList) {
+        for (DataFileMeta fileMeta : fileMetaList) {
+            FieldStats[] statsArray = getTableValueStats(fileMeta).fields(null);
+            assertThat(statsArray.length).isEqualTo(12);
+            if (filesName.contains(fileMeta.fileName())) {
+                checkTwoValues(statsArray);
+            } else {
+                checkOneValue(statsArray);
+            }
+        }
+    }
+
+    /**
+     * Check file data with one data.
+     *
+     * <ul>
+     *   <li>data:
+     *       2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
+     *   <li>types: a->int, b->varchar[10], c->varchar[10], d->double, e->int, f->decimal,g->float,
+     *       h->double, i->decimal, j->date, k->date, l->varbinary
+     * </ul>
+     *
+     * @param statsArray the field stats
+     */
+    private void checkOneValue(FieldStats[] statsArray) {
+        assertThat(statsArray[0].minValue()).isEqualTo((statsArray[0].maxValue())).isEqualTo(2);
+        assertThat(statsArray[1].minValue())
+                .isEqualTo(statsArray[1].maxValue())
+                .isEqualTo(StringData.fromString("400"));
+        assertThat(statsArray[2].minValue())
+                .isEqualTo(statsArray[2].maxValue())
+                .isEqualTo(StringData.fromString("401"));
+        assertThat((Double) statsArray[3].minValue())
+                .isEqualTo((Double) statsArray[3].maxValue())
+                .isEqualTo(402D);
+        assertThat((Integer) statsArray[4].minValue())
+                .isEqualTo(statsArray[4].maxValue())
+                .isEqualTo(403);
+        assertThat(((DecimalData) statsArray[5].minValue()).toBigDecimal().intValue())
+                .isEqualTo(((DecimalData) statsArray[5].maxValue()).toBigDecimal().intValue())
+                .isEqualTo(404);
+        assertThat((Float) statsArray[6].minValue())
+                .isEqualTo(statsArray[6].maxValue())
+                .isEqualTo(405F);
+        assertThat((Double) statsArray[7].minValue())
+                .isEqualTo(statsArray[7].maxValue())
+                .isEqualTo(406D);
+        assertThat(((DecimalData) statsArray[8].minValue()).toBigDecimal().doubleValue())
+                .isEqualTo(((DecimalData) statsArray[8].maxValue()).toBigDecimal().doubleValue())
+                .isEqualTo(407D);
+        assertThat(statsArray[9].minValue()).isEqualTo(statsArray[9].maxValue()).isEqualTo(408);
+        assertThat(statsArray[10].minValue()).isEqualTo(statsArray[10].maxValue()).isEqualTo(409);
+
+        // Min and max value of binary type is null
+        assertThat(statsArray[11].minValue()).isNull();
+        assertThat(statsArray[11].maxValue()).isNull();
+    }
+
+    /**
+     * Check file with new types and data.
+     *
+     * <ul>
+     *   <li>data1: 2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209 *
+     *       millsPerDay),toBytes("210")
+     *   <li>data2: 2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309 *
+     *       millsPerDay),toBytes("310")
+     *   <li>old types: a->int, b->char[10], c->varchar[10], d->decimal, e->smallint, f->int,
+     *       g->bigint, h->float, i->double, j->date, k->timestamp, l->binary
+     *   <li>new types: a->int, b->varchar[10], c->varchar[10], d->double, e->int,
+     *       f->decimal,g->float, h->double, i->decimal, j->date, k->date, l->varbinary
+     * </ul>
+     */
+    private void checkTwoValues(FieldStats[] statsArray) {
+        assertThat(statsArray[0].minValue()).isEqualTo(2);
+        assertThat(statsArray[0].maxValue()).isEqualTo(2);
+
+        assertThat(statsArray[1].minValue()).isEqualTo(StringData.fromString("200       "));
+        assertThat(statsArray[1].maxValue()).isEqualTo(StringData.fromString("300       "));
+
+        assertThat(statsArray[2].minValue()).isEqualTo(StringData.fromString("201"));
+        assertThat(statsArray[2].maxValue()).isEqualTo(StringData.fromString("301"));
+
+        assertThat((Double) statsArray[3].minValue()).isEqualTo(202D);
+        assertThat((Double) statsArray[3].maxValue()).isEqualTo(302D);
+
+        assertThat((Integer) statsArray[4].minValue()).isEqualTo(203);
+        assertThat((Integer) statsArray[4].maxValue()).isEqualTo(303);
+
+        assertThat(((DecimalData) statsArray[5].minValue()).toBigDecimal().intValue())
+                .isEqualTo(204);
+        assertThat(((DecimalData) statsArray[5].maxValue()).toBigDecimal().intValue())
+                .isEqualTo(304);
+
+        assertThat((Float) statsArray[6].minValue()).isEqualTo(205F);
+        assertThat((Float) statsArray[6].maxValue()).isEqualTo(305F);
+
+        assertThat((Double) statsArray[7].minValue()).isEqualTo(206D);
+        assertThat((Double) statsArray[7].maxValue()).isEqualTo(306D);
+
+        assertThat(((DecimalData) statsArray[8].minValue()).toBigDecimal().doubleValue())
+                .isEqualTo(207D);
+        assertThat(((DecimalData) statsArray[8].maxValue()).toBigDecimal().doubleValue())
+                .isEqualTo(307D);
+
+        assertThat(statsArray[9].minValue()).isEqualTo(208);
+        assertThat(statsArray[9].maxValue()).isEqualTo(308);
+        assertThat(statsArray[10].minValue()).isEqualTo(209);
+        assertThat(statsArray[10].maxValue()).isEqualTo(309);
+
+        // Min and max value of binary type is null
+        assertThat(statsArray[11].minValue()).isNull();
+        assertThat(statsArray[11].maxValue()).isNull();
+    }
+
+    @Override
+    protected List<String> getPrimaryKeyNames() {
+        return SCHEMA_PRIMARY_KEYS;
+    }
+
+    protected abstract BinaryTableStats getTableValueStats(DataFileMeta fileMeta);
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
index b2f7215d..4a839e9e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
@@ -315,17 +315,4 @@ public abstract class FileMetaFilterTestBase extends SchemaEvolutionTableTestBas
                 table.newScan().withFilter(builder.equal(index, value)).plan();
         checkFilterRowCount(plan, expectedRowCount);
     }
-
-    protected static void checkFilterRowCount(
-            DataTableScan.DataFilePlan plan, long expectedRowCount) {
-        List<DataFileMeta> fileMetaList =
-                plan.splits.stream().flatMap(s -> s.files().stream()).collect(Collectors.toList());
-        checkFilterRowCount(fileMetaList, expectedRowCount);
-    }
-
-    protected static void checkFilterRowCount(
-            List<DataFileMeta> fileMetaList, long expectedRowCount) {
-        assertThat(fileMetaList.stream().mapToLong(DataFileMeta::rowCount).sum())
-                .isEqualTo(expectedRowCount);
-    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java
index 145b0846..ff3bdf0a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTableTestBase.java
@@ -22,10 +22,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.schema.AtomicDataType;
 import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.SchemaChange;
@@ -36,12 +39,14 @@ import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.TraceableFileSystem;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -52,6 +57,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -77,6 +83,29 @@ public abstract class SchemaEvolutionTableTestBase {
     protected static final List<String> PARTITION_NAMES = Collections.singletonList("pt");
     protected static final List<String> PRIMARY_KEY_NAMES = Arrays.asList("pt", "kt");
 
+    protected static final List<DataField> SCHEMA_FIELDS =
+            Arrays.asList(
+                    new DataField(0, "a", new AtomicDataType(DataTypes.INT().getLogicalType())),
+                    new DataField(1, "b", new AtomicDataType(DataTypes.CHAR(10).getLogicalType())),
+                    new DataField(
+                            2, "c", new AtomicDataType(DataTypes.VARCHAR(10).getLogicalType())),
+                    new DataField(
+                            3, "d", new AtomicDataType(DataTypes.DECIMAL(10, 2).getLogicalType())),
+                    new DataField(
+                            4, "e", new AtomicDataType(DataTypes.SMALLINT().getLogicalType())),
+                    new DataField(5, "f", new AtomicDataType(DataTypes.INT().getLogicalType())),
+                    new DataField(6, "g", new AtomicDataType(DataTypes.BIGINT().getLogicalType())),
+                    new DataField(7, "h", new AtomicDataType(DataTypes.FLOAT().getLogicalType())),
+                    new DataField(8, "i", new AtomicDataType(DataTypes.DOUBLE().getLogicalType())),
+                    new DataField(9, "j", new AtomicDataType(DataTypes.DATE().getLogicalType())),
+                    new DataField(
+                            10, "k", new AtomicDataType(DataTypes.TIMESTAMP(2).getLogicalType())),
+                    new DataField(
+                            11, "l", new AtomicDataType(DataTypes.BINARY(100).getLogicalType())));
+    protected static final List<String> SCHEMA_PARTITION_NAMES = Collections.singletonList("a");
+    protected static final List<String> SCHEMA_PRIMARY_KEYS =
+            Arrays.asList("a", "b", "c", "d", "e");
+
     protected Path tablePath;
     protected String commitUser;
     protected final Configuration tableConfig = new Configuration();
@@ -185,6 +214,215 @@ public abstract class SchemaEvolutionTableTestBase {
         secondChecker.accept(result, tableSchemas);
     }
 
+    public static <R> void writeAndCheckFileResultForColumnType(
+            Function<Map<Long, TableSchema>, R> firstChecker,
+            BiConsumer<R, Map<Long, TableSchema>> secondChecker,
+            List<String> primaryKeyNames,
+            Configuration tableConfig,
+            Function<Map<Long, TableSchema>, FileStoreTable> createFileStoreTable)
+            throws Exception {
+        Map<Long, TableSchema> tableSchemas = new HashMap<>();
+        // Create schema with SCHEMA_FIELDS
+        tableSchemas.put(
+                0L,
+                new TableSchema(
+                        0,
+                        SCHEMA_FIELDS,
+                        12,
+                        SCHEMA_PARTITION_NAMES,
+                        primaryKeyNames,
+                        tableConfig.toMap(),
+                        ""));
+        FileStoreTable table = createFileStoreTable.apply(tableSchemas);
+        TableWrite write = table.newWrite("user");
+        TableCommit commit = table.newCommit("user");
+
+        /**
+         * Generate two files:
+         *
+         * <ul>
+         *   <li>file1 with one data: 1,"100","101",(102),short)
+         *       103,104,105L,106F,107D,108,toTimestamp(109 * millsPerDay),"110".getBytes()
+         *   <li>file2 with two data:
+         *       <ul>
+         *         <li>2,"200","201",toDecimal(202),(short)
+         *             203,204,205L,206F,207D,208,toTimestamp(209 * millsPerDay),toBytes("210")
+         *         <li>2,"300","301",toDecimal(302),(short)
+         *             303,304,305L,306F,307D,308,toTimestamp(309 * millsPerDay),toBytes("310")
+         *       </ul>
+         * </ul>
+         */
+        final long millsPerDay = 86400000;
+        write.write(
+                rowData(
+                        1,
+                        "100",
+                        "101",
+                        toDecimal(102),
+                        (short) 103,
+                        104,
+                        105L,
+                        106F,
+                        107D,
+                        108,
+                        toTimestamp(109 * millsPerDay),
+                        "110".getBytes()));
+        write.write(
+                rowData(
+                        2,
+                        "200",
+                        "201",
+                        toDecimal(202),
+                        (short) 203,
+                        204,
+                        205L,
+                        206F,
+                        207D,
+                        208,
+                        toTimestamp(209 * millsPerDay),
+                        toBytes("210")));
+        write.write(
+                rowData(
+                        2,
+                        "300",
+                        "301",
+                        toDecimal(302),
+                        (short) 303,
+                        304,
+                        305L,
+                        306F,
+                        307D,
+                        308,
+                        toTimestamp(309 * millsPerDay),
+                        toBytes("310")));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        R result = firstChecker.apply(tableSchemas);
+
+        /**
+         * Fields before and after column type evolution:
+         *
+         * <ul>
+         *   <li>Before: a->string, b->char[10], c->varchar[10], d->decimal, e->smallint, f->int,
+         *       g->bigint, h->float, i->double, j->date, k->timestamp, l->binary
+         *   <li>After: a->string, b->varchar[10], c->varchar[10], d->double, e->int, f->decimal,
+         *       g->float, h->double, i->decimal, j->date, k->date, l->varbinary
+         * </ul>
+         */
+        //
+        List<DataField> evolutionFields = new ArrayList<>(SCHEMA_FIELDS);
+        evolutionFields.set(
+                1,
+                new DataField(1, "b", new AtomicDataType(DataTypes.VARCHAR(10).getLogicalType())));
+        evolutionFields.set(
+                3, new DataField(3, "d", new AtomicDataType(DataTypes.DOUBLE().getLogicalType())));
+        evolutionFields.set(
+                4, new DataField(4, "e", new AtomicDataType(DataTypes.INT().getLogicalType())));
+        evolutionFields.set(
+                5,
+                new DataField(
+                        5, "f", new AtomicDataType(DataTypes.DECIMAL(10, 2).getLogicalType())));
+        evolutionFields.set(
+                6, new DataField(6, "g", new AtomicDataType(DataTypes.FLOAT().getLogicalType())));
+        evolutionFields.set(
+                7, new DataField(7, "h", new AtomicDataType(DataTypes.DOUBLE().getLogicalType())));
+        evolutionFields.set(
+                8,
+                new DataField(
+                        8, "i", new AtomicDataType(DataTypes.DECIMAL(10, 2).getLogicalType())));
+        evolutionFields.set(
+                10, new DataField(10, "k", new AtomicDataType(DataTypes.DATE().getLogicalType())));
+        evolutionFields.set(
+                11,
+                new DataField(
+                        11, "l", new AtomicDataType(DataTypes.VARBINARY(100).getLogicalType())));
+        tableSchemas.put(
+                1L,
+                new TableSchema(
+                        1,
+                        evolutionFields,
+                        12,
+                        SCHEMA_PARTITION_NAMES,
+                        primaryKeyNames,
+                        tableConfig.toMap(),
+                        ""));
+        table = createFileStoreTable.apply(tableSchemas);
+        write = table.newWrite("user");
+        commit = table.newCommit("user");
+
+        /**
+         * Generate another two files:
+         *
+         * <ul>
+         *   <li>file1 with one data:
+         *       2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
+         *   <li>file2 with two data:
+         *       <ul>
+         *         <li>1,"500","501",502D,503,toDecimal(504),505F,506D,toDecimal(507),508,509,toBytes("510")
+         *         <li>1,"600","601",602D,603,toDecimal(604),605F,606D,toDecimal(607),608,609,toBytes("610")
+         *       </ul>
+         * </ul>
+         */
+        write.write(
+                rowData(
+                        2,
+                        "400",
+                        "401",
+                        402D,
+                        403,
+                        toDecimal(404),
+                        405F,
+                        406D,
+                        toDecimal(407),
+                        408,
+                        409,
+                        toBytes("410")));
+        write.write(
+                rowData(
+                        1,
+                        "500",
+                        "501",
+                        502D,
+                        503,
+                        toDecimal(504),
+                        505F,
+                        506D,
+                        toDecimal(507),
+                        508,
+                        509,
+                        toBytes("510")));
+        write.write(
+                rowData(
+                        1,
+                        "600",
+                        "601",
+                        602D,
+                        603,
+                        toDecimal(604),
+                        605F,
+                        606D,
+                        toDecimal(607),
+                        608,
+                        609,
+                        toBytes("610")));
+        commit.commit(1, write.prepareCommit(true, 1));
+        write.close();
+
+        secondChecker.accept(result, tableSchemas);
+    }
+
+    private static DecimalData toDecimal(int val) {
+        return DecimalData.fromBigDecimal(new BigDecimal(val), 10, 2);
+    }
+
+    private static TimestampData toTimestamp(long mills) {
+        return TimestampData.fromEpochMillis(mills);
+    }
+
+    private static byte[] toBytes(String val) {
+        return val.getBytes();
+    }
+
     protected static RowData rowData(Object... values) {
         List<Object> valueList = new ArrayList<>(values.length);
         for (Object value : values) {
@@ -197,6 +435,19 @@ public abstract class SchemaEvolutionTableTestBase {
         return GenericRowData.of(valueList.toArray(new Object[0]));
     }
 
+    protected static void checkFilterRowCount(
+            DataTableScan.DataFilePlan plan, long expectedRowCount) {
+        List<DataFileMeta> fileMetaList =
+                plan.splits.stream().flatMap(s -> s.files().stream()).collect(Collectors.toList());
+        checkFilterRowCount(fileMetaList, expectedRowCount);
+    }
+
+    protected static void checkFilterRowCount(
+            List<DataFileMeta> fileMetaList, long expectedRowCount) {
+        assertThat(fileMetaList.stream().mapToLong(DataFileMeta::rowCount).sum())
+                .isEqualTo(expectedRowCount);
+    }
+
     /** {@link SchemaManager} subclass for testing. */
     public static class TestingSchemaManager extends SchemaManager {
         private final Map<Long, TableSchema> tableSchemas;