You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/21 14:15:19 UTC

[flink-table-store] branch release-0.2 updated (4775c360 -> 3eee4bf4)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


    from 4775c360 [hotfix] Note execution.checkpointing.mode in write documentation
     new 1973b737 [FLINK-29345] Create reusing reader/writer config in orc format
     new 3eee4bf4 [FLINK-29278] BINARY type is not supported in table store

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/format/FileStatsExtractorTestBase.java   |   8 ++
 .../store/table/AppendOnlyFileStoreTableTest.java  |  51 +++++----
 .../ChangelogValueCountFileStoreTableTest.java     |  56 ++++++----
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 103 +++++++++---------
 .../table/store/table/FileStoreTableTestBase.java  |  75 ++++++++-----
 .../table/store/table/WritePreemptMemoryTest.java  |   3 +-
 .../store/format/orc/OrcBulkWriterFactory.java     | 116 +++++++++++++++++++++
 .../table/store/format/orc/OrcFileFormat.java      |  78 +++++++++++---
 .../store/format/orc/OrcBulkWriterFactoryTest.java |  83 +++++++++++++++
 .../table/store/format/orc/OrcFileFormatTest.java  |   9 +-
 .../format/orc/OrcFileStatsExtractorTest.java      |   4 +
 11 files changed, 445 insertions(+), 141 deletions(-)
 create mode 100644 flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
 create mode 100644 flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java


[flink-table-store] 01/02: [FLINK-29345] Create reusing reader/writer config in orc format

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 1973b737fc97fe1ecc83ee2c80f3e9253557ed7a
Author: shammon <zj...@gmail.com>
AuthorDate: Wed Sep 21 11:37:28 2022 +0800

    [FLINK-29345] Create reusing reader/writer config in orc format
    
    This closes #296
---
 .../store/format/orc/OrcBulkWriterFactory.java     | 116 +++++++++++++++++++++
 .../table/store/format/orc/OrcFileFormat.java      |  35 ++++---
 .../store/format/orc/OrcBulkWriterFactoryTest.java |  83 +++++++++++++++
 .../table/store/format/orc/OrcFileFormatTest.java  |   9 +-
 4 files changed, 225 insertions(+), 18 deletions(-)

diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
new file mode 100644
index 00000000..b6670392
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.orc.vector.Vectorizer;
+import org.apache.flink.orc.writer.PhysicalWriterImpl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Writer;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Orc {@link BulkWriter.Factory}. The main code is copied from Flink {@code OrcBulkWriterFactory}.
+ */
+public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
+
+    private final Vectorizer<T> vectorizer;
+    private final OrcFile.WriterOptions writerOptions;
+
+    /**
+     * Creates a new OrcBulkWriterFactory using the provided Vectorizer, ORC WriterOptions.
+     *
+     * @param vectorizer The vectorizer implementation to convert input record to a
+     *     VectorizerRowBatch.
+     * @param writerOptions ORC WriterOptions.
+     */
+    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, OrcFile.WriterOptions writerOptions) {
+        this.vectorizer = checkNotNull(vectorizer);
+        this.writerOptions = checkNotNull(writerOptions);
+    }
+
+    @Override
+    public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
+        OrcFile.WriterOptions opts = getWriterOptions();
+        opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+
+        // The path of the Writer is not used to indicate the destination file
+        // in this case since we have used a dedicated physical writer to write
+        // to the give output stream directly. However, the path would be used as
+        // the key of writer in the ORC memory manager, thus we need to make it unique.
+        Path unusedPath = new Path(UUID.randomUUID().toString());
+        return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts));
+    }
+
+    @VisibleForTesting
+    protected OrcFile.WriterOptions getWriterOptions() {
+        return writerOptions;
+    }
+
+    /** Orc {@link BulkWriter}. The main code is copied from Flink {@code OrcBulkWriter}. */
+    private static class OrcBulkWriter<T> implements BulkWriter<T> {
+
+        private final Writer writer;
+        private final Vectorizer<T> vectorizer;
+        private final VectorizedRowBatch rowBatch;
+
+        OrcBulkWriter(Vectorizer<T> vectorizer, Writer writer) {
+            this.vectorizer = checkNotNull(vectorizer);
+            this.writer = checkNotNull(writer);
+            this.rowBatch = vectorizer.getSchema().createRowBatch();
+
+            // Configure the vectorizer with the writer so that users can add
+            // metadata on the fly through the Vectorizer#vectorize(...) method.
+            this.vectorizer.setWriter(this.writer);
+        }
+
+        @Override
+        public void addElement(T element) throws IOException {
+            vectorizer.vectorize(element, rowBatch);
+            if (rowBatch.size == rowBatch.getMaxSize()) {
+                writer.addRowBatch(rowBatch);
+                rowBatch.reset();
+            }
+        }
+
+        @Override
+        public void flush() throws IOException {
+            if (rowBatch.size != 0) {
+                writer.addRowBatch(rowBatch);
+                rowBatch.reset();
+            }
+        }
+
+        @Override
+        public void finish() throws IOException {
+            flush();
+            writer.close();
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 880074c2..3192cf56 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -27,7 +27,8 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.orc.OrcFilters;
 import org.apache.flink.orc.OrcSplitReaderUtil;
 import org.apache.flink.orc.vector.RowDataVectorizer;
-import org.apache.flink.orc.writer.OrcBulkWriterFactory;
+import org.apache.flink.orc.vector.Vectorizer;
+import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
@@ -36,6 +37,7 @@ import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 
 import java.util.ArrayList;
@@ -48,16 +50,24 @@ import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENT
 /** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */
 public class OrcFileFormat extends FileFormat {
 
-    private final Configuration formatOptions;
+    private final Properties orcProperties;
+    private final org.apache.hadoop.conf.Configuration readerConf;
+    private final org.apache.hadoop.conf.Configuration writerConf;
 
     public OrcFileFormat(Configuration formatOptions) {
         super(org.apache.flink.orc.OrcFileFormatFactory.IDENTIFIER);
-        this.formatOptions = formatOptions;
+        this.orcProperties = getOrcProperties(formatOptions);
+        this.readerConf = new org.apache.hadoop.conf.Configuration();
+        this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString()));
+
+        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+        this.writerConf = new ThreadLocalClassLoaderConfiguration();
+        conf.forEach(entry -> writerConf.set(entry.getKey(), entry.getValue()));
     }
 
     @VisibleForTesting
-    Configuration formatOptions() {
-        return formatOptions;
+    Properties orcProperties() {
+        return orcProperties;
     }
 
     @Override
@@ -79,12 +89,8 @@ public class OrcFileFormat extends FileFormat {
             }
         }
 
-        Properties properties = getOrcProperties(formatOptions);
-        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-        properties.forEach((k, v) -> conf.set(k.toString(), v.toString()));
-
         return OrcInputFormatFactory.create(
-                conf, type, Projection.of(projection).toTopLevelIndexes(), orcPredicates);
+                readerConf, type, Projection.of(projection).toTopLevelIndexes(), orcPredicates);
     }
 
     @Override
@@ -92,11 +98,12 @@ public class OrcFileFormat extends FileFormat {
         LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
 
         TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(type);
+        Vectorizer<RowData> vectorizer =
+                new RowDataVectorizer(typeDescription.toString(), orcTypes);
+        OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(orcProperties, writerConf);
+        writerOptions.setSchema(vectorizer.getSchema());
 
-        return new OrcBulkWriterFactory<>(
-                new RowDataVectorizer(typeDescription.toString(), orcTypes),
-                getOrcProperties(formatOptions),
-                new org.apache.hadoop.conf.Configuration());
+        return new OrcBulkWriterFactory<>(vectorizer, writerOptions);
     }
 
     private static Properties getOrcProperties(ReadableConfig options) {
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java
new file mode 100644
index 00000000..374bc1c6
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.MemoryManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+/** Test for {@link OrcBulkWriterFactory}. */
+public class OrcBulkWriterFactoryTest {
+    @TempDir File tempDir;
+
+    @Test
+    public void testNotOverrideInMemoryManager() throws IOException {
+        OrcFileFormat orcFileFormat = new OrcFileFormat(new Configuration());
+        OrcBulkWriterFactory<RowData> factory =
+                (OrcBulkWriterFactory<RowData>)
+                        orcFileFormat.createWriterFactory(
+                                RowType.of(new VarCharType(), new IntType()));
+
+        TestMemoryManager memoryManager = new TestMemoryManager();
+        factory.getWriterOptions().memory(memoryManager);
+
+        factory.create(new LocalDataOutputStream(new File(tempDir, UUID.randomUUID().toString())));
+        factory.create(new LocalDataOutputStream(new File(tempDir, UUID.randomUUID().toString())));
+
+        List<Path> addedWriterPath = memoryManager.getAddedWriterPath();
+        assertEquals(2, addedWriterPath.size());
+        assertNotEquals(addedWriterPath.get(1), addedWriterPath.get(0));
+    }
+
+    private static class TestMemoryManager implements MemoryManager {
+        private final List<Path> addedWriterPath = new ArrayList<>();
+
+        @Override
+        public void addWriter(Path path, long requestedAllocation, Callback callback) {
+            addedWriterPath.add(path);
+        }
+
+        public List<Path> getAddedWriterPath() {
+            return addedWriterPath;
+        }
+
+        @Override
+        public void removeWriter(Path path) {}
+
+        @Override
+        public void addedRow(int rows) {}
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java
index b99d337f..ba1dbacb 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 
 import org.junit.jupiter.api.Test;
 
+import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link OrcFileFormatFactory}. */
@@ -32,8 +33,8 @@ public class OrcFileFormatTest {
         Configuration options = new Configuration();
         options.setString("haha", "1");
         OrcFileFormat orc = new OrcFileFormatFactory().create(options);
-        assertThat(orc.formatOptions().getString("haha", "")).isEqualTo("1");
-        assertThat(orc.formatOptions().getString("compress", "")).isEqualTo("lz4");
+        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1");
+        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("lz4");
     }
 
     @Test
@@ -42,7 +43,7 @@ public class OrcFileFormatTest {
         options.setString("haha", "1");
         options.setString("compress", "zlib");
         OrcFileFormat orc = new OrcFileFormatFactory().create(options);
-        assertThat(orc.formatOptions().getString("haha", "")).isEqualTo("1");
-        assertThat(orc.formatOptions().getString("compress", "")).isEqualTo("zlib");
+        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1");
+        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("zlib");
     }
 }


[flink-table-store] 02/02: [FLINK-29278] BINARY type is not supported in table store

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 3eee4bf4eddd6d22a0225e1958601a44f7dd9ba7
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Wed Sep 21 19:21:03 2022 +0800

    [FLINK-29278] BINARY type is not supported in table store
    
    This closes #292
---
 .../store/format/FileStatsExtractorTestBase.java   |   8 ++
 .../store/table/AppendOnlyFileStoreTableTest.java  |  51 +++++-----
 .../ChangelogValueCountFileStoreTableTest.java     |  56 ++++++-----
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 103 ++++++++++-----------
 .../table/store/table/FileStoreTableTestBase.java  |  75 ++++++++++-----
 .../table/store/table/WritePreemptMemoryTest.java  |   3 +-
 .../table/store/format/orc/OrcFileFormat.java      |  45 ++++++++-
 .../format/orc/OrcFileStatsExtractorTest.java      |   4 +
 8 files changed, 221 insertions(+), 124 deletions(-)

diff --git a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 90a53f7f..18d5c89a 100644
--- a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
+++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -30,11 +30,13 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 import org.junit.jupiter.api.Test;
@@ -128,6 +130,12 @@ public abstract class FileStatsExtractorTestBase {
                         randomString(random.nextInt(varCharType.getLength()) + 1));
             case BOOLEAN:
                 return random.nextBoolean();
+            case BINARY:
+                BinaryType binaryType = (BinaryType) type;
+                return randomString(binaryType.getLength()).getBytes();
+            case VARBINARY:
+                VarBinaryType varBinaryType = (VarBinaryType) type;
+                return randomString(varBinaryType.getLength()).getBytes();
             case TINYINT:
                 return (byte) random.nextInt(10);
             case SMALLINT:
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 9cc7ed32..82f6508e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -63,9 +62,19 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Arrays.asList("1|10|100", "1|11|101", "1|12|102", "1|11|101", "1|12|102"));
+                        Arrays.asList(
+                                "1|10|100|binary|varbinary",
+                                "1|11|101|binary|varbinary",
+                                "1|12|102|binary|varbinary",
+                                "1|11|101|binary|varbinary",
+                                "1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", "2|22|202", "2|21|201"));
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "2|20|200|binary|varbinary",
+                                "2|21|201|binary|varbinary",
+                                "2|22|202|binary|varbinary",
+                                "2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -94,10 +103,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "2|21|201",
+                                "2|21|201|binary|varbinary",
                                 // this record is in the same file with the first "2|21|201"
-                                "2|22|202",
-                                "2|21|201"));
+                                "2|22|202|binary|varbinary",
+                                "2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -108,9 +117,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
+                .isEqualTo(
+                        Arrays.asList("+1|11|101|binary|varbinary", "+1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("+2|21|201"));
+                .isEqualTo(Collections.singletonList("+2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -139,9 +149,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "+1|11|101",
+                                "+1|11|101|binary|varbinary",
                                 // this record is in the same file with "+1|11|101"
-                                "+1|12|102"));
+                                "+1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEmpty();
     }
 
@@ -161,8 +171,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
             for (int j = 0; j < Math.max(random.nextInt(200), 1); j++) {
                 BinaryRowData data =
                         serializer
-                                .toBinaryRow(
-                                        GenericRowData.of(i, random.nextInt(), random.nextLong()))
+                                .toBinaryRow(rowData(i, random.nextInt(), random.nextLong()))
                                 .copy();
                 int bucket = bucket(hashcode(data), numOfBucket);
                 dataPerBucket.compute(
@@ -204,19 +213,19 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 12, 102L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(2, 22, 202L));
+        write.write(rowData(1, 12, 102L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 22, 202L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 11, 101L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(1, 12, 102L));
+        write.write(rowData(1, 11, 101L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(1, 12, 102L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 894f5d85..8ce299dd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -53,9 +52,17 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("1|11|101", "1|11|101", "1|12|102"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "1|11|101|binary|varbinary",
+                                "1|11|101|binary|varbinary",
+                                "1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "2|20|200|binary|varbinary",
+                                "2|21|201|binary|varbinary",
+                                "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -84,9 +91,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "2|21|201",
+                                "2|21|201|binary|varbinary",
                                 // this record is in the same file with "delete 2|21|201"
-                                "2|22|202"));
+                                "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -97,9 +104,14 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("-1|10|100", "+1|11|101"));
+                .isEqualTo(
+                        Arrays.asList("-1|10|100|binary|varbinary", "+1|11|101|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("-2|21|201", "-2|21|201", "+2|22|202"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "-2|21|201|binary|varbinary",
+                                "-2|21|201|binary|varbinary",
+                                "+2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -129,10 +141,10 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "-2|21|201",
-                                "-2|21|201",
+                                "-2|21|201|binary|varbinary",
+                                "-2|21|201|binary|varbinary",
                                 // this record is in the same file with "delete 2|21|201"
-                                "+2|22|202"));
+                                "+2|22|202|binary|varbinary"));
     }
 
     private void writeData() throws Exception {
@@ -140,22 +152,22 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(1, 12, 102L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(1, 12, 102L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 21, 201L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 11, 101L));
-        write.write(GenericRowData.of(2, 22, 202L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
+        write.write(rowData(1, 11, 101L));
+        write.write(rowData(2, 22, 202L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 9b1495fe..2d143c39 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.file.WriteMode;
@@ -52,18 +51,18 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                 createFileStoreTable(conf -> conf.set(CoreOptions.SEQUENCE_FIELD, "b"));
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10, 200L));
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
-        write.write(GenericRowData.of(1, 11, 55L));
+        write.write(rowData(1, 11, 55L));
         commit.commit("1", write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("1|10|200", "1|11|101"));
+                .isEqualTo(Arrays.asList("1|10|200|binary|varbinary", "1|11|101|binary|varbinary"));
     }
 
     @Test
@@ -74,9 +73,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("1|10|1000"));
+                .isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("2|21|20001", "2|22|202"));
+                .isEqualTo(
+                        Arrays.asList("2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -107,7 +107,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
-                                "2|21|20001", "2|22|202"));
+                                "2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -118,9 +118,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("-1|11|1001"));
+                .isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("-2|20|200", "+2|21|20001", "+2|22|202"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "-2|20|200|binary|varbinary",
+                                "+2|21|20001|binary|varbinary",
+                                "+2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -153,7 +157,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
-                                "-2|20|200", "+2|21|20001", "+2|22|202"));
+                                "-2|20|200|binary|varbinary",
+                                "+2|21|20001|binary|varbinary",
+                                "+2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -163,11 +169,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
-        write.write(GenericRowData.of(1, 10, 101L));
-        write.write(GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
-        write.write(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+        write.write(rowData(1, 10, 101L));
+        write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
+        write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
         commit.commit("0", write.prepareCommit(true));
         write.close();
 
@@ -176,11 +182,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "+I 1|10|100",
-                                "-D 1|10|100",
-                                "+I 1|10|101",
-                                "-U 1|10|101",
-                                "+U 1|10|102"));
+                                "+I 1|10|100|binary|varbinary",
+                                "-D 1|10|100|binary|varbinary",
+                                "+I 1|10|101|binary|varbinary",
+                                "-U 1|10|101|binary|varbinary",
+                                "+U 1|10|102|binary|varbinary"));
     }
 
     private void writeData() throws Exception {
@@ -188,21 +194,21 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 10, 1000L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(2, 21, 2001L));
+        write.write(rowData(1, 10, 1000L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 21, 2001L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 11, 1001L));
-        write.write(GenericRowData.of(2, 21, 20001L));
-        write.write(GenericRowData.of(2, 22, 202L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
+        write.write(rowData(1, 11, 1001L));
+        write.write(rowData(2, 21, 20001L));
+        write.write(rowData(2, 22, 202L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
@@ -216,37 +222,28 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(1, 20, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 30, 300L));
-        write.write(GenericRowData.of(1, 40, 400L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 40, 400L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 50, 500L));
-        write.write(GenericRowData.of(1, 60, 600L));
+        write.write(rowData(1, 50, 500L));
+        write.write(rowData(1, 60, 600L));
         commit.commit("2", write.prepareCommit(true));
 
-        write.close();
-
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
         List<Split> splits = table.newScan().plan().splits;
 
-        TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+        // push down key filter a = 30
+        TableRead read = table.newRead().withFilter(builder.equal(1, 30));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Arrays.asList(
-                                "1|10|100",
-                                "1|20|200",
-                                "1|30|300",
-                                "1|40|400",
-                                "1|50|500",
-                                "1|60|600"));
-
-        read = table.newRead().withFilter(builder.equal(1, 30));
-        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+                        Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+
+        write.close();
     }
 
     @Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 74b824db..19b7a836 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -65,12 +65,23 @@ public abstract class FileStoreTableTestBase {
                     new LogicalType[] {
                         DataTypes.INT().getLogicalType(),
                         DataTypes.INT().getLogicalType(),
-                        DataTypes.BIGINT().getLogicalType()
+                        DataTypes.BIGINT().getLogicalType(),
+                        DataTypes.BINARY(1).getLogicalType(),
+                        DataTypes.VARBINARY(1).getLogicalType()
                     },
-                    new String[] {"pt", "a", "b"});
+                    new String[] {"pt", "a", "b", "c", "d"});
     protected static final int[] PROJECTION = new int[] {2, 1};
     protected static final Function<RowData, String> BATCH_ROW_TO_STRING =
-            rowData -> rowData.getInt(0) + "|" + rowData.getInt(1) + "|" + rowData.getLong(2);
+            rowData ->
+                    rowData.getInt(0)
+                            + "|"
+                            + rowData.getInt(1)
+                            + "|"
+                            + rowData.getLong(2)
+                            + "|"
+                            + new String(rowData.getBinary(3))
+                            + "|"
+                            + new String(rowData.getBinary(4));
     protected static final Function<RowData, String> BATCH_PROJECTED_ROW_TO_STRING =
             rowData -> rowData.getLong(0) + "|" + rowData.getInt(1);
     protected static final Function<RowData, String> STREAMING_ROW_TO_STRING =
@@ -100,14 +111,14 @@ public abstract class FileStoreTableTestBase {
 
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
         commit.commit("0", write.prepareCommit(true));
         write.close();
 
         write = table.newWrite().withOverwrite(true);
         commit = table.newCommit("user");
-        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(rowData(2, 21, 201L));
         Map<String, String> overwritePartition = new HashMap<>();
         overwritePartition.put("pt", "2");
         commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true));
@@ -116,9 +127,9 @@ public abstract class FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("1|10|100"));
+                .hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("2|21|201"));
+                .hasSameElementsAs(Collections.singletonList("2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -131,11 +142,11 @@ public abstract class FileStoreTableTestBase {
                         });
 
         TableWrite write = table.newWrite();
-        write.write(GenericRowData.of(1, 1, 2L));
-        write.write(GenericRowData.of(1, 3, 4L));
-        write.write(GenericRowData.of(1, 5, 6L));
-        write.write(GenericRowData.of(1, 7, 8L));
-        write.write(GenericRowData.of(1, 9, 10L));
+        write.write(rowData(1, 1, 2L));
+        write.write(rowData(1, 3, 4L));
+        write.write(rowData(1, 5, 6L));
+        write.write(rowData(1, 7, 8L));
+        write.write(rowData(1, 9, 10L));
         table.newCommit("user").commit("0", write.prepareCommit(true));
         write.close();
 
@@ -155,16 +166,16 @@ public abstract class FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(1, 20, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 30, 300L));
-        write.write(GenericRowData.of(1, 40, 400L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 40, 400L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 50, 500L));
-        write.write(GenericRowData.of(1, 60, 600L));
+        write.write(rowData(1, 50, 500L));
+        write.write(rowData(1, 60, 600L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
@@ -173,7 +184,8 @@ public abstract class FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+                .hasSameElementsAs(
+                        Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
     }
 
     @Test
@@ -185,21 +197,21 @@ public abstract class FileStoreTableTestBase {
         for (int i = 0; i < 4; i++) {
             // write lots of records, let compaction be slower
             for (int j = 0; j < 1000; j++) {
-                write.write(GenericRowData.of(1, 10 * i * j, 100L * i * j));
+                write.write(rowData(1, 10 * i * j, 100L * i * j));
             }
             commit.commit(String.valueOf(i), write.prepareCommit(false));
         }
 
-        write.write(GenericRowData.of(1, 40, 400L));
+        write.write(rowData(1, 40, 400L));
         List<FileCommittable> commit4 = write.prepareCommit(false);
         // trigger compaction, but not wait it.
 
-        write.write(GenericRowData.of(2, 20, 200L));
+        write.write(rowData(2, 20, 200L));
         List<FileCommittable> commit5 = write.prepareCommit(true);
         // wait compaction finish
         // commit5 should be a compaction commit
 
-        write.write(GenericRowData.of(1, 60, 600L));
+        write.write(rowData(1, 60, 600L));
         List<FileCommittable> commit6 = write.prepareCommit(true);
         // if remove writer too fast, will see old files, do another compaction
         // then will be conflicts
@@ -251,6 +263,21 @@ public abstract class FileStoreTableTestBase {
         return b;
     }
 
+    protected GenericRowData rowData(Object... values) {
+        return GenericRowData.of(
+                values[0], values[1], values[2], "binary".getBytes(), "varbinary".getBytes());
+    }
+
+    protected GenericRowData rowDataWithKind(RowKind rowKind, Object... values) {
+        return GenericRowData.ofKind(
+                rowKind,
+                values[0],
+                values[1],
+                values[2],
+                "binary".getBytes(),
+                "varbinary".getBytes());
+    }
+
     protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception {
         return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket));
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 7ddb76dd..353ac24e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -68,8 +68,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
         Random random = new Random();
         List<String> expected = new ArrayList<>();
         for (int i = 0; i < 10_000; i++) {
-            GenericRowData row =
-                    GenericRowData.of(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
+            GenericRowData row = rowData(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
             write.write(row);
             expected.add(BATCH_ROW_TO_STRING.apply(row));
         }
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 3192cf56..5d07840a 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -29,12 +29,15 @@ import org.apache.flink.orc.OrcSplitReaderUtil;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.vector.Vectorizer;
 import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.store.utils.Projection;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.orc.OrcFile;
@@ -44,6 +47,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
 
@@ -90,14 +94,18 @@ public class OrcFileFormat extends FileFormat {
         }
 
         return OrcInputFormatFactory.create(
-                readerConf, type, Projection.of(projection).toTopLevelIndexes(), orcPredicates);
+                readerConf,
+                (RowType) refineLogicalType(type),
+                Projection.of(projection).toTopLevelIndexes(),
+                orcPredicates);
     }
 
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
         LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
 
-        TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(type);
+        TypeDescription typeDescription =
+                OrcSplitReaderUtil.logicalTypeToOrcType(refineLogicalType(type));
         Vectorizer<RowData> vectorizer =
                 new RowDataVectorizer(typeDescription.toString(), orcTypes);
         OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(orcProperties, writerConf);
@@ -113,4 +121,37 @@ public class OrcFileFormat extends FileFormat {
         properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v));
         return orcProperties;
     }
+
+    private static LogicalType refineLogicalType(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case BINARY:
+            case VARBINARY:
+                // OrcSplitReaderUtil#logicalTypeToOrcType() only supports the DataTypes.BYTES()
+                // logical type for BINARY and VARBINARY.
+                return DataTypes.BYTES().getLogicalType();
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) type;
+                return new ArrayType(
+                        arrayType.isNullable(), refineLogicalType(arrayType.getElementType()));
+            case MAP:
+                MapType mapType = (MapType) type;
+                return new MapType(
+                        refineLogicalType(mapType.getKeyType()),
+                        refineLogicalType(mapType.getValueType()));
+            case ROW:
+                RowType rowType = (RowType) type;
+                return new RowType(
+                        rowType.isNullable(),
+                        rowType.getFields().stream()
+                                .map(
+                                        f ->
+                                                new RowType.RowField(
+                                                        f.getName(),
+                                                        refineLogicalType(f.getType()),
+                                                        f.getDescription().orElse(null)))
+                                .collect(Collectors.toList()));
+            default:
+                return type;
+        }
+    }
 }
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index eaac4b29..86aba5c4 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DateType;
@@ -34,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 /** Tests for {@link OrcFileStatsExtractor}. */
@@ -50,6 +52,8 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
                 new CharType(8),
                 new VarCharType(8),
                 new BooleanType(),
+                new BinaryType(8),
+                new VarBinaryType(8),
                 new TinyIntType(),
                 new SmallIntType(),
                 new IntType(),