You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/21 14:15:21 UTC

[flink-table-store] 02/02: [FLINK-29278] BINARY type is not supported in table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 3eee4bf4eddd6d22a0225e1958601a44f7dd9ba7
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Wed Sep 21 19:21:03 2022 +0800

    [FLINK-29278] BINARY type is not supported in table store
    
    This closes #292
---
 .../store/format/FileStatsExtractorTestBase.java   |   8 ++
 .../store/table/AppendOnlyFileStoreTableTest.java  |  51 +++++-----
 .../ChangelogValueCountFileStoreTableTest.java     |  56 ++++++-----
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 103 ++++++++++-----------
 .../table/store/table/FileStoreTableTestBase.java  |  75 ++++++++++-----
 .../table/store/table/WritePreemptMemoryTest.java  |   3 +-
 .../table/store/format/orc/OrcFileFormat.java      |  45 ++++++++-
 .../format/orc/OrcFileStatsExtractorTest.java      |   4 +
 8 files changed, 221 insertions(+), 124 deletions(-)

diff --git a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 90a53f7f..18d5c89a 100644
--- a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
+++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -30,11 +30,13 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 import org.junit.jupiter.api.Test;
@@ -128,6 +130,12 @@ public abstract class FileStatsExtractorTestBase {
                         randomString(random.nextInt(varCharType.getLength()) + 1));
             case BOOLEAN:
                 return random.nextBoolean();
+            case BINARY:
+                BinaryType binaryType = (BinaryType) type;
+                return randomString(binaryType.getLength()).getBytes();
+            case VARBINARY:
+                VarBinaryType varBinaryType = (VarBinaryType) type;
+                return randomString(varBinaryType.getLength()).getBytes();
             case TINYINT:
                 return (byte) random.nextInt(10);
             case SMALLINT:
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 9cc7ed32..82f6508e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -63,9 +62,19 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Arrays.asList("1|10|100", "1|11|101", "1|12|102", "1|11|101", "1|12|102"));
+                        Arrays.asList(
+                                "1|10|100|binary|varbinary",
+                                "1|11|101|binary|varbinary",
+                                "1|12|102|binary|varbinary",
+                                "1|11|101|binary|varbinary",
+                                "1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", "2|22|202", "2|21|201"));
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "2|20|200|binary|varbinary",
+                                "2|21|201|binary|varbinary",
+                                "2|22|202|binary|varbinary",
+                                "2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -94,10 +103,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "2|21|201",
+                                "2|21|201|binary|varbinary",
                                 // this record is in the same file with the first "2|21|201"
-                                "2|22|202",
-                                "2|21|201"));
+                                "2|22|202|binary|varbinary",
+                                "2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -108,9 +117,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
+                .isEqualTo(
+                        Arrays.asList("+1|11|101|binary|varbinary", "+1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("+2|21|201"));
+                .isEqualTo(Collections.singletonList("+2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -139,9 +149,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "+1|11|101",
+                                "+1|11|101|binary|varbinary",
                                 // this record is in the same file with "+1|11|101"
-                                "+1|12|102"));
+                                "+1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEmpty();
     }
 
@@ -161,8 +171,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
             for (int j = 0; j < Math.max(random.nextInt(200), 1); j++) {
                 BinaryRowData data =
                         serializer
-                                .toBinaryRow(
-                                        GenericRowData.of(i, random.nextInt(), random.nextLong()))
+                                .toBinaryRow(rowData(i, random.nextInt(), random.nextLong()))
                                 .copy();
                 int bucket = bucket(hashcode(data), numOfBucket);
                 dataPerBucket.compute(
@@ -204,19 +213,19 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 12, 102L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(2, 22, 202L));
+        write.write(rowData(1, 12, 102L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 22, 202L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 11, 101L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(1, 12, 102L));
+        write.write(rowData(1, 11, 101L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(1, 12, 102L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 894f5d85..8ce299dd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -53,9 +52,17 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("1|11|101", "1|11|101", "1|12|102"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "1|11|101|binary|varbinary",
+                                "1|11|101|binary|varbinary",
+                                "1|12|102|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "2|20|200|binary|varbinary",
+                                "2|21|201|binary|varbinary",
+                                "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -84,9 +91,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "2|21|201",
+                                "2|21|201|binary|varbinary",
                                 // this record is in the same file with "delete 2|21|201"
-                                "2|22|202"));
+                                "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -97,9 +104,14 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("-1|10|100", "+1|11|101"));
+                .isEqualTo(
+                        Arrays.asList("-1|10|100|binary|varbinary", "+1|11|101|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("-2|21|201", "-2|21|201", "+2|22|202"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "-2|21|201|binary|varbinary",
+                                "-2|21|201|binary|varbinary",
+                                "+2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -129,10 +141,10 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "-2|21|201",
-                                "-2|21|201",
+                                "-2|21|201|binary|varbinary",
+                                "-2|21|201|binary|varbinary",
                                 // this record is in the same file with "delete 2|21|201"
-                                "+2|22|202"));
+                                "+2|22|202|binary|varbinary"));
     }
 
     private void writeData() throws Exception {
@@ -140,22 +152,22 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(1, 12, 102L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(1, 12, 102L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 21, 201L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 11, 101L));
-        write.write(GenericRowData.of(2, 22, 202L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
+        write.write(rowData(1, 11, 101L));
+        write.write(rowData(2, 22, 202L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 9b1495fe..2d143c39 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.file.WriteMode;
@@ -52,18 +51,18 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                 createFileStoreTable(conf -> conf.set(CoreOptions.SEQUENCE_FIELD, "b"));
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10, 200L));
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
-        write.write(GenericRowData.of(1, 11, 55L));
+        write.write(rowData(1, 11, 55L));
         commit.commit("1", write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("1|10|200", "1|11|101"));
+                .isEqualTo(Arrays.asList("1|10|200|binary|varbinary", "1|11|101|binary|varbinary"));
     }
 
     @Test
@@ -74,9 +73,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("1|10|1000"));
+                .isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("2|21|20001", "2|22|202"));
+                .isEqualTo(
+                        Arrays.asList("2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -107,7 +107,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
-                                "2|21|20001", "2|22|202"));
+                                "2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -118,9 +118,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("-1|11|1001"));
+                .isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("-2|20|200", "+2|21|20001", "+2|22|202"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "-2|20|200|binary|varbinary",
+                                "+2|21|20001|binary|varbinary",
+                                "+2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -153,7 +157,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
-                                "-2|20|200", "+2|21|20001", "+2|22|202"));
+                                "-2|20|200|binary|varbinary",
+                                "+2|21|20001|binary|varbinary",
+                                "+2|22|202|binary|varbinary"));
     }
 
     @Test
@@ -163,11 +169,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
-        write.write(GenericRowData.of(1, 10, 101L));
-        write.write(GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
-        write.write(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+        write.write(rowData(1, 10, 101L));
+        write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
+        write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
         commit.commit("0", write.prepareCommit(true));
         write.close();
 
@@ -176,11 +182,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "+I 1|10|100",
-                                "-D 1|10|100",
-                                "+I 1|10|101",
-                                "-U 1|10|101",
-                                "+U 1|10|102"));
+                                "+I 1|10|100|binary|varbinary",
+                                "-D 1|10|100|binary|varbinary",
+                                "+I 1|10|101|binary|varbinary",
+                                "-U 1|10|101|binary|varbinary",
+                                "+U 1|10|102|binary|varbinary"));
     }
 
     private void writeData() throws Exception {
@@ -188,21 +194,21 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
-        write.write(GenericRowData.of(1, 11, 101L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(1, 11, 101L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 10, 1000L));
-        write.write(GenericRowData.of(2, 21, 201L));
-        write.write(GenericRowData.of(2, 21, 2001L));
+        write.write(rowData(1, 10, 1000L));
+        write.write(rowData(2, 21, 201L));
+        write.write(rowData(2, 21, 2001L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 11, 1001L));
-        write.write(GenericRowData.of(2, 21, 20001L));
-        write.write(GenericRowData.of(2, 22, 202L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
+        write.write(rowData(1, 11, 1001L));
+        write.write(rowData(2, 21, 20001L));
+        write.write(rowData(2, 22, 202L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
@@ -216,37 +222,28 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(1, 20, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 30, 300L));
-        write.write(GenericRowData.of(1, 40, 400L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 40, 400L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 50, 500L));
-        write.write(GenericRowData.of(1, 60, 600L));
+        write.write(rowData(1, 50, 500L));
+        write.write(rowData(1, 60, 600L));
         commit.commit("2", write.prepareCommit(true));
 
-        write.close();
-
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
         List<Split> splits = table.newScan().plan().splits;
 
-        TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+        // push down key filter a = 30
+        TableRead read = table.newRead().withFilter(builder.equal(1, 30));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Arrays.asList(
-                                "1|10|100",
-                                "1|20|200",
-                                "1|30|300",
-                                "1|40|400",
-                                "1|50|500",
-                                "1|60|600"));
-
-        read = table.newRead().withFilter(builder.equal(1, 30));
-        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+                        Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+
+        write.close();
     }
 
     @Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 74b824db..19b7a836 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -65,12 +65,23 @@ public abstract class FileStoreTableTestBase {
                     new LogicalType[] {
                         DataTypes.INT().getLogicalType(),
                         DataTypes.INT().getLogicalType(),
-                        DataTypes.BIGINT().getLogicalType()
+                        DataTypes.BIGINT().getLogicalType(),
+                        DataTypes.BINARY(1).getLogicalType(),
+                        DataTypes.VARBINARY(1).getLogicalType()
                     },
-                    new String[] {"pt", "a", "b"});
+                    new String[] {"pt", "a", "b", "c", "d"});
     protected static final int[] PROJECTION = new int[] {2, 1};
     protected static final Function<RowData, String> BATCH_ROW_TO_STRING =
-            rowData -> rowData.getInt(0) + "|" + rowData.getInt(1) + "|" + rowData.getLong(2);
+            rowData ->
+                    rowData.getInt(0)
+                            + "|"
+                            + rowData.getInt(1)
+                            + "|"
+                            + rowData.getLong(2)
+                            + "|"
+                            + new String(rowData.getBinary(3))
+                            + "|"
+                            + new String(rowData.getBinary(4));
     protected static final Function<RowData, String> BATCH_PROJECTED_ROW_TO_STRING =
             rowData -> rowData.getLong(0) + "|" + rowData.getInt(1);
     protected static final Function<RowData, String> STREAMING_ROW_TO_STRING =
@@ -100,14 +111,14 @@ public abstract class FileStoreTableTestBase {
 
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(2, 20, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
         commit.commit("0", write.prepareCommit(true));
         write.close();
 
         write = table.newWrite().withOverwrite(true);
         commit = table.newCommit("user");
-        write.write(GenericRowData.of(2, 21, 201L));
+        write.write(rowData(2, 21, 201L));
         Map<String, String> overwritePartition = new HashMap<>();
         overwritePartition.put("pt", "2");
         commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true));
@@ -116,9 +127,9 @@ public abstract class FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("1|10|100"));
+                .hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("2|21|201"));
+                .hasSameElementsAs(Collections.singletonList("2|21|201|binary|varbinary"));
     }
 
     @Test
@@ -131,11 +142,11 @@ public abstract class FileStoreTableTestBase {
                         });
 
         TableWrite write = table.newWrite();
-        write.write(GenericRowData.of(1, 1, 2L));
-        write.write(GenericRowData.of(1, 3, 4L));
-        write.write(GenericRowData.of(1, 5, 6L));
-        write.write(GenericRowData.of(1, 7, 8L));
-        write.write(GenericRowData.of(1, 9, 10L));
+        write.write(rowData(1, 1, 2L));
+        write.write(rowData(1, 3, 4L));
+        write.write(rowData(1, 5, 6L));
+        write.write(rowData(1, 7, 8L));
+        write.write(rowData(1, 9, 10L));
         table.newCommit("user").commit("0", write.prepareCommit(true));
         write.close();
 
@@ -155,16 +166,16 @@ public abstract class FileStoreTableTestBase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
 
-        write.write(GenericRowData.of(1, 10, 100L));
-        write.write(GenericRowData.of(1, 20, 200L));
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
         commit.commit("0", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 30, 300L));
-        write.write(GenericRowData.of(1, 40, 400L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 40, 400L));
         commit.commit("1", write.prepareCommit(true));
 
-        write.write(GenericRowData.of(1, 50, 500L));
-        write.write(GenericRowData.of(1, 60, 600L));
+        write.write(rowData(1, 50, 500L));
+        write.write(rowData(1, 60, 600L));
         commit.commit("2", write.prepareCommit(true));
 
         write.close();
@@ -173,7 +184,8 @@ public abstract class FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+                .hasSameElementsAs(
+                        Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
     }
 
     @Test
@@ -185,21 +197,21 @@ public abstract class FileStoreTableTestBase {
         for (int i = 0; i < 4; i++) {
             // write lots of records, let compaction be slower
             for (int j = 0; j < 1000; j++) {
-                write.write(GenericRowData.of(1, 10 * i * j, 100L * i * j));
+                write.write(rowData(1, 10 * i * j, 100L * i * j));
             }
             commit.commit(String.valueOf(i), write.prepareCommit(false));
         }
 
-        write.write(GenericRowData.of(1, 40, 400L));
+        write.write(rowData(1, 40, 400L));
         List<FileCommittable> commit4 = write.prepareCommit(false);
         // trigger compaction, but not wait it.
 
-        write.write(GenericRowData.of(2, 20, 200L));
+        write.write(rowData(2, 20, 200L));
         List<FileCommittable> commit5 = write.prepareCommit(true);
         // wait compaction finish
         // commit5 should be a compaction commit
 
-        write.write(GenericRowData.of(1, 60, 600L));
+        write.write(rowData(1, 60, 600L));
         List<FileCommittable> commit6 = write.prepareCommit(true);
         // if remove writer too fast, will see old files, do another compaction
         // then will be conflicts
@@ -251,6 +263,21 @@ public abstract class FileStoreTableTestBase {
         return b;
     }
 
+    protected GenericRowData rowData(Object... values) {
+        return GenericRowData.of(
+                values[0], values[1], values[2], "binary".getBytes(), "varbinary".getBytes());
+    }
+
+    protected GenericRowData rowDataWithKind(RowKind rowKind, Object... values) {
+        return GenericRowData.ofKind(
+                rowKind,
+                values[0],
+                values[1],
+                values[2],
+                "binary".getBytes(),
+                "varbinary".getBytes());
+    }
+
     protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception {
         return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket));
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 7ddb76dd..353ac24e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -68,8 +68,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
         Random random = new Random();
         List<String> expected = new ArrayList<>();
         for (int i = 0; i < 10_000; i++) {
-            GenericRowData row =
-                    GenericRowData.of(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
+            GenericRowData row = rowData(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
             write.write(row);
             expected.add(BATCH_ROW_TO_STRING.apply(row));
         }
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 3192cf56..5d07840a 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -29,12 +29,15 @@ import org.apache.flink.orc.OrcSplitReaderUtil;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.vector.Vectorizer;
 import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.store.utils.Projection;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.orc.OrcFile;
@@ -44,6 +47,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
 
@@ -90,14 +94,18 @@ public class OrcFileFormat extends FileFormat {
         }
 
         return OrcInputFormatFactory.create(
-                readerConf, type, Projection.of(projection).toTopLevelIndexes(), orcPredicates);
+                readerConf,
+                (RowType) refineLogicalType(type),
+                Projection.of(projection).toTopLevelIndexes(),
+                orcPredicates);
     }
 
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
         LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
 
-        TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(type);
+        TypeDescription typeDescription =
+                OrcSplitReaderUtil.logicalTypeToOrcType(refineLogicalType(type));
         Vectorizer<RowData> vectorizer =
                 new RowDataVectorizer(typeDescription.toString(), orcTypes);
         OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(orcProperties, writerConf);
@@ -113,4 +121,37 @@ public class OrcFileFormat extends FileFormat {
         properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v));
         return orcProperties;
     }
+
+    private static LogicalType refineLogicalType(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case BINARY:
+            case VARBINARY:
+                // OrcSplitReaderUtil#logicalTypeToOrcType() only supports the DataTypes.BYTES()
+                // logical type for BINARY and VARBINARY.
+                return DataTypes.BYTES().getLogicalType();
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) type;
+                return new ArrayType(
+                        arrayType.isNullable(), refineLogicalType(arrayType.getElementType()));
+            case MAP:
+                MapType mapType = (MapType) type;
+                return new MapType(
+                        refineLogicalType(mapType.getKeyType()),
+                        refineLogicalType(mapType.getValueType()));
+            case ROW:
+                RowType rowType = (RowType) type;
+                return new RowType(
+                        rowType.isNullable(),
+                        rowType.getFields().stream()
+                                .map(
+                                        f ->
+                                                new RowType.RowField(
+                                                        f.getName(),
+                                                        refineLogicalType(f.getType()),
+                                                        f.getDescription().orElse(null)))
+                                .collect(Collectors.toList()));
+            default:
+                return type;
+        }
+    }
 }
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index eaac4b29..86aba5c4 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DateType;
@@ -34,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 /** Tests for {@link OrcFileStatsExtractor}. */
@@ -50,6 +52,8 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
                 new CharType(8),
                 new VarCharType(8),
                 new BooleanType(),
+                new BinaryType(8),
+                new VarBinaryType(8),
                 new TinyIntType(),
                 new SmallIntType(),
                 new IntType(),