You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/06 09:28:37 UTC

[flink-table-store] branch master updated: [FLINK-30582] Flink-avro Flink-orc free for flink-table-store-format

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b3474aff [FLINK-30582] Flink-avro Flink-orc free for flink-table-store-format
b3474aff is described below

commit b3474aff36a661677e22d56e627ef227390c2689
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jan 6 17:28:32 2023 +0800

    [FLINK-30582] Flink-avro Flink-orc free for flink-table-store-format
    
    This closes #461
---
 .../flink-table-store-micro-benchmarks/pom.xml     |   6 -
 .../apache/flink/table/store/utils/MapBuilder.java |  38 ++
 flink-table-store-format/pom.xml                   | 168 ++---
 .../store/format/avro/AbstractAvroBulkFormat.java  |   1 -
 .../flink/table/store/format/avro/AvroBuilder.java |  33 +
 .../table/store/format/avro/AvroBulkWriter.java    |  56 ++
 .../table/store/format/avro/AvroFileFormat.java    |  19 +-
 .../store/format/avro/AvroSchemaConverter.java     | 216 +++++++
 .../store/format/avro/AvroToRowDataConverters.java | 262 ++++++++
 .../table/store/format/avro/AvroWriterFactory.java |  49 ++
 .../store/format/avro/CloseShieldOutputStream.java |  56 ++
 .../format/avro/FSDataInputStreamWrapper.java      |  67 ++
 .../table/store/format/avro/JodaConverter.java     |  70 +++
 .../store/format/avro/RowDataToAvroConverters.java | 311 ++++++++++
 .../table/store/format/orc/OrcFileFormat.java      |  33 +-
 .../store/format/orc/OrcInputFormatFactory.java    | 104 ----
 .../table/store/format/orc/OrcReaderFactory.java   | 403 ++++++++++++
 .../flink/table/store/format/orc/OrcShimImpl.java  | 163 -----
 .../table/store/format/orc/OrcWriterFactory.java   | 125 ++++
 .../orc/SerializableHadoopConfigWrapper.java       |  78 +++
 .../orc/ThreadLocalClassLoaderConfiguration.java   |  58 ++
 .../orc/{ => filter}/OrcFileStatsExtractor.java    |   5 +-
 .../table/store/format/orc/filter/OrcFilters.java  | 685 +++++++++++++++++++++
 .../{ => filter}/OrcPredicateFunctionVisitor.java  |   3 +-
 .../format/orc/reader/AbstractOrcColumnVector.java |  79 +++
 .../format/orc/reader/OrcArrayColumnVector.java    |  47 ++
 .../format/orc/reader/OrcBytesColumnVector.java    |  42 ++
 .../format/orc/reader/OrcDecimalColumnVector.java  |  46 ++
 .../format/orc/reader/OrcDoubleColumnVector.java   |  47 ++
 .../orc/reader/OrcLegacyTimestampColumnVector.java |  91 +++
 .../format/orc/reader/OrcLongColumnVector.java     |  65 ++
 .../format/orc/reader/OrcMapColumnVector.java      |  49 ++
 .../format/orc/reader/OrcRowColumnVector.java      |  49 ++
 .../format/orc/reader/OrcSplitReaderUtil.java      |  99 +++
 .../orc/reader/OrcTimestampColumnVector.java       |  49 ++
 .../store/format/orc/writer/OrcBulkWriter.java     |  75 +++
 .../format/orc/writer/PhysicalWriterImpl.java      | 398 ++++++++++++
 .../store/format/orc/writer/RowDataVectorizer.java | 277 +++++++++
 .../table/store/format/orc/writer/Vectorizer.java  |  96 +++
 .../src/main/resources/META-INF/NOTICE             |   6 +
 .../store/format/avro/AvroBulkFormatTest.java      | 284 +++++++++
 .../store/format/avro/AvroBulkFormatTestUtils.java |  70 +++
 .../store/format/orc/OrcBulkWriterTestUtil.java    |  96 +++
 .../format/orc/OrcFileStatsExtractorTest.java      |   1 +
 .../store/format/orc/OrcFilterConverterTest.java   |   3 +-
 .../store/format/orc/OrcReaderFactoryTest.java     | 291 +++++++++
 .../store/format/orc/OrcSplitReaderUtilTest.java   |  67 ++
 .../store/format/orc/OrcWriterFactoryTest.java     |  89 +++
 .../flink/table/store/format/orc/Record.java       |  55 ++
 .../table/store/format/orc/RecordVectorizer.java   |  55 ++
 .../src/test/resources/test-data-decimal.orc       | Bin 0 -> 16337 bytes
 .../src/test/resources/test-data-flat.orc          | Bin 0 -> 408522 bytes
 .../flink-table-store-hive-catalog/pom.xml         |  13 -
 .../flink/table/store/spark/SparkS3ITCase.java     |  28 +-
 pom.xml                                            |   8 -
 55 files changed, 5177 insertions(+), 407 deletions(-)

diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
index db990385..67ba847c 100644
--- a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
+++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
@@ -131,12 +131,6 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>${flink.sql.parquet}</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-
     </dependencies>
 
     <!-- This is copied from flink-benchmarks and updated for flink-table-store-micro-benchmarks. -->
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MapBuilder.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MapBuilder.java
new file mode 100644
index 00000000..dc92f4d9
--- /dev/null
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MapBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Build for {@link Map}. */
+public class MapBuilder<K, V> {
+
+    private final Map<K, V> map = new HashMap<>();
+
+    public MapBuilder<K, V> put(K k, V v) {
+        map.put(k, v);
+        return this;
+    }
+
+    public Map<K, V> unmodifiable() {
+        return Collections.unmodifiableMap(map);
+    }
+}
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index f548e487..6f256d0a 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -33,6 +33,7 @@ under the License.
 
     <properties>
         <parquet.version>1.12.3</parquet.version>
+        <orc.version>1.5.6</orc.version>
     </properties>
 
     <dependencies>
@@ -59,74 +60,91 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
-        <!-- format dependencies -->
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-files</artifactId>
             <version>${flink.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
+        <!-- Hadoop -->
 
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-sql-avro</artifactId>
-            <version>${flink.version}</version>
-            <scope>runtime</scope>
-            <optional>true</optional>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>*</groupId>
-                    <artifactId>*</artifactId>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>${flink.orc}</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
+        <!-- Orc Start -->
 
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>${flink.sql.orc}</artifactId>
-            <version>${flink.version}</version>
-            <scope>runtime</scope>
-            <optional>true</optional>
+            <groupId>org.apache.orc</groupId>
+            <artifactId>orc-core</artifactId>
+            <version>${orc.version}</version>
             <exclusions>
+                <!-- Exclude ORC's Hadoop dependency and pull in provided vanilla hadoop. -->
                 <exclusion>
-                    <groupId>*</groupId>
-                    <artifactId>*</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
                 </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <exclusions>
                 <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
+                    <groupId>javax.activation</groupId>
+                    <artifactId>javax.activation-api</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>javax.xml.bind</groupId>
+                    <artifactId>jaxb-api</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.12.0</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Orc End -->
+
+        <!-- Avro Start -->
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.11.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <scope>provided</scope>
+            <!-- Avro records can contain JodaTime fields when using logical fields.
+                In order to handle them, we need to add an optional dependency.
+                Users with those Avro records need to add this dependency themselves. -->
+            <optional>true</optional>
+            <version>2.5</version>
+        </dependency>
+
+        <!-- Avro End -->
+
         <!-- Parquet Start -->
 
         <dependency>
@@ -254,8 +272,20 @@ under the License.
                         <configuration>
                             <artifactSet>
                                 <includes combine.children="append">
-                                    <include>org.apache.flink:flink-sql-avro</include>
-                                    <include>org.apache.flink:${flink.sql.orc}</include>
+                                    <!-- Orc -->
+                                    <include>org.apache.orc:orc-core</include>
+                                    <include>org.apache.orc:orc-shims</include>
+                                    <include>org.apache.hive:hive-storage-api</include>
+                                    <include>io.airlift:aircompressor</include>
+                                    <include>commons-lang:commons-lang</include>
+                                    <include>com.google.protobuf:protobuf-java</include>
+
+                                    <!-- Avro -->
+                                    <include>org.apache.avro:avro</include>
+                                    <include>com.fasterxml.jackson.core:jackson-core</include>
+                                    <include>com.fasterxml.jackson.core:jackson-databind</include>
+                                    <include>com.fasterxml.jackson.core:jackson-annotations</include>
+                                    <include>org.apache.commons:commons-compress</include>
 
                                     <!-- Parquet -->
                                     <include>org.apache.parquet:parquet-avro</include>
@@ -270,23 +300,6 @@ under the License.
                                 </includes>
                             </artifactSet>
                             <filters>
-                                <!--
-                                Throw away all META-INF/services,
-                                otherwise if user has the same format/connector jar in the classpath,
-                                FactoryUtil will complain about multiple matching factories.
-                                -->
-                                <filter>
-                                    <artifact>org.apache.flink:flink-sql-avro</artifact>
-                                    <excludes>
-                                        <exclude>META-INF/services/**</exclude>
-                                    </excludes>
-                                </filter>
-                                <filter>
-                                    <artifact>org.apache.flink:${flink.sql.orc}</artifact>
-                                    <excludes>
-                                        <exclude>META-INF/services/**</exclude>
-                                    </excludes>
-                                </filter>
                                 <!-- Another copy of the Apache license, which we don't need. -->
                                 <filter>
                                     <artifact>*</artifact>
@@ -296,22 +309,7 @@ under the License.
                                 </filter>
                             </filters>
                             <relocations>
-                                <relocation>
-                                    <pattern>org.apache.avro</pattern>
-                                    <shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>org.apache.flink.formats.avro</pattern>
-                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.avro</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>org.apache.flink.orc</pattern>
-                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.orc</shadedPattern>
-                                </relocation>
-                                <!--
-                                flink-sql-orc module does not shade its dependencies, so we shade here.
-                                See maven-shade-plugin usage of flink-sql-orc for detailed dependency list.
-                                -->
+                                <!-- Relocate Orc -->
                                 <relocation>
                                     <pattern>org.apache.orc</pattern>
                                     <shadedPattern>org.apache.flink.table.store.shaded.org.apache.orc</shadedPattern>
@@ -333,19 +331,31 @@ under the License.
                                     <shadedPattern>org.apache.flink.table.store.shaded.com.google.protobuf</shadedPattern>
                                 </relocation>
 
+                                <!-- Relocate Avro. -->
+                                <relocation>
+                                    <pattern>org.apache.avro</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.avro</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+
                                 <!-- Relocate parquet. -->
                                 <relocation>
                                     <pattern>org.apache.parquet</pattern>
                                     <shadedPattern>org.apache.flink.table.store.shaded.org.apache.parquet</shadedPattern>
                                 </relocation>
-                                <relocation>
-                                    <pattern>org.apache.commons</pattern>
-                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons</shadedPattern>
-                                </relocation>
                                 <relocation>
                                     <pattern>shaded.parquet</pattern>
                                     <shadedPattern>org.apache.flink.table.store.shaded.parquet</shadedPattern>
                                 </relocation>
+
+                                <!-- Relocate Common. -->
+                                <relocation>
+                                    <pattern>org.apache.commons</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons</shadedPattern>
+                                </relocation>
                             </relocations>
                         </configuration>
                     </execution>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java
index 66ba335b..ef232179 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.file.src.util.IteratorResultIterator;
 import org.apache.flink.connector.file.src.util.Pool;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBuilder.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBuilder.java
new file mode 100644
index 00000000..9e1a18b3
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.avro.file.DataFileWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/** A builder to create an {@link AvroBulkWriter} from an {@link OutputStream}. */
+@FunctionalInterface
+public interface AvroBuilder<T> extends Serializable {
+
+    /** Creates and configures an Avro writer to the given output file. */
+    DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBulkWriter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBulkWriter.java
new file mode 100644
index 00000000..f159d413
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBulkWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+import org.apache.avro.file.DataFileWriter;
+
+import java.io.IOException;
+
+/** A simple {@link BulkWriter} implementation that wraps an Avro {@link DataFileWriter}. */
+public class AvroBulkWriter<T> implements BulkWriter<T> {
+
+    /** The underlying Avro writer. */
+    private final DataFileWriter<T> dataFileWriter;
+
+    /**
+     * Create a new AvroBulkWriter wrapping the given Avro {@link DataFileWriter}.
+     *
+     * @param dataFileWriter The underlying Avro writer.
+     */
+    public AvroBulkWriter(DataFileWriter<T> dataFileWriter) {
+        this.dataFileWriter = dataFileWriter;
+    }
+
+    @Override
+    public void addElement(T element) throws IOException {
+        dataFileWriter.append(element);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        dataFileWriter.flush();
+    }
+
+    @Override
+    public void finish() throws IOException {
+        dataFileWriter.close();
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index f908fae2..2b3e413c 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -20,15 +20,12 @@ package org.apache.flink.table.store.format.avro;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.formats.avro.AvroBuilder;
-import org.apache.flink.formats.avro.AvroToRowDataConverters;
-import org.apache.flink.formats.avro.AvroWriterFactory;
-import org.apache.flink.formats.avro.RowDataToAvroConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -53,15 +50,23 @@ import java.io.OutputStream;
 import java.util.List;
 import java.util.function.Function;
 
-import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 
 /** Avro {@link FileFormat}. The main code is copied from Flink {@code AvroFileFormatFactory}. */
 public class AvroFileFormat extends FileFormat {
 
+    public static final String IDENTIFIER = "avro";
+
+    public static final ConfigOption<String> AVRO_OUTPUT_CODEC =
+            ConfigOptions.key("codec")
+                    .stringType()
+                    .defaultValue(SNAPPY_CODEC)
+                    .withDescription("The compression codec for avro");
+
     private final ReadableConfig formatOptions;
 
     public AvroFileFormat(ReadableConfig formatOptions) {
-        super(org.apache.flink.formats.avro.AvroFileFormatFactory.IDENTIFIER);
+        super(IDENTIFIER);
         this.formatOptions = formatOptions;
     }
 
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
new file mode 100644
index 00000000..73a9c37b
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+
+import java.util.List;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link RowTypeInfo} for
+ * representing objects and converts Avro types into types that are compatible with Flink's Table &
+ * SQL API.
+ */
+public class AvroSchemaConverter {
+
+    private AvroSchemaConverter() {
+        // private
+    }
+
+    /**
+     * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+     *
+     * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+     *
+     * @param schema the schema type, usually it should be the top level record type, e.g. not a
+     *     nested type
+     * @return Avro's {@link Schema} matching this logical type.
+     */
+    public static Schema convertToSchema(LogicalType schema) {
+        return convertToSchema(schema, "org.apache.flink.avro.generated.record");
+    }
+
+    /**
+     * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+     *
+     * <p>The "{rowName}_" is used as the nested row type name prefix in order to generate the right
+     * schema. Nested record type that only differs with type name is still compatible.
+     *
+     * @param logicalType logical type
+     * @param rowName the record name
+     * @return Avro's {@link Schema} matching this logical type.
+     */
+    public static Schema convertToSchema(LogicalType logicalType, String rowName) {
+        int precision;
+        boolean nullable = logicalType.isNullable();
+        switch (logicalType.getTypeRoot()) {
+            case NULL:
+                return SchemaBuilder.builder().nullType();
+            case BOOLEAN:
+                Schema bool = SchemaBuilder.builder().booleanType();
+                return nullable ? nullableSchema(bool) : bool;
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+                Schema integer = SchemaBuilder.builder().intType();
+                return nullable ? nullableSchema(integer) : integer;
+            case BIGINT:
+                Schema bigint = SchemaBuilder.builder().longType();
+                return nullable ? nullableSchema(bigint) : bigint;
+            case FLOAT:
+                Schema f = SchemaBuilder.builder().floatType();
+                return nullable ? nullableSchema(f) : f;
+            case DOUBLE:
+                Schema d = SchemaBuilder.builder().doubleType();
+                return nullable ? nullableSchema(d) : d;
+            case CHAR:
+            case VARCHAR:
+                Schema str = SchemaBuilder.builder().stringType();
+                return nullable ? nullableSchema(str) : str;
+            case BINARY:
+            case VARBINARY:
+                Schema binary = SchemaBuilder.builder().bytesType();
+                return nullable ? nullableSchema(binary) : binary;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                // use long to represents Timestamp
+                final TimestampType timestampType = (TimestampType) logicalType;
+                precision = timestampType.getPrecision();
+                org.apache.avro.LogicalType avroLogicalType;
+                if (precision <= 3) {
+                    avroLogicalType = LogicalTypes.timestampMillis();
+                } else {
+                    throw new IllegalArgumentException(
+                            "Avro does not support TIMESTAMP type "
+                                    + "with precision: "
+                                    + precision
+                                    + ", it only supports precision less than 3.");
+                }
+                Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+                return nullable ? nullableSchema(timestamp) : timestamp;
+            case DATE:
+                // use int to represents Date
+                Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+                return nullable ? nullableSchema(date) : date;
+            case TIME_WITHOUT_TIME_ZONE:
+                precision = ((TimeType) logicalType).getPrecision();
+                if (precision > 3) {
+                    throw new IllegalArgumentException(
+                            "Avro does not support TIME type with precision: "
+                                    + precision
+                                    + ", it only supports precision less than 3.");
+                }
+                // use int to represents Time, we only support millisecond when deserialization
+                Schema time =
+                        LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+                return nullable ? nullableSchema(time) : time;
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) logicalType;
+                // store BigDecimal as byte[]
+                Schema decimal =
+                        LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
+                                .addToSchema(SchemaBuilder.builder().bytesType());
+                return nullable ? nullableSchema(decimal) : decimal;
+            case ROW:
+                RowType rowType = (RowType) logicalType;
+                List<String> fieldNames = rowType.getFieldNames();
+                // we have to make sure the record name is different in a Schema
+                SchemaBuilder.FieldAssembler<Schema> builder =
+                        SchemaBuilder.builder().record(rowName).fields();
+                for (int i = 0; i < rowType.getFieldCount(); i++) {
+                    String fieldName = fieldNames.get(i);
+                    LogicalType fieldType = rowType.getTypeAt(i);
+                    SchemaBuilder.GenericDefault<Schema> fieldBuilder =
+                            builder.name(fieldName)
+                                    .type(convertToSchema(fieldType, rowName + "_" + fieldName));
+
+                    if (fieldType.isNullable()) {
+                        builder = fieldBuilder.withDefault(null);
+                    } else {
+                        builder = fieldBuilder.noDefault();
+                    }
+                }
+                Schema record = builder.endRecord();
+                return nullable ? nullableSchema(record) : record;
+            case MULTISET:
+            case MAP:
+                Schema map =
+                        SchemaBuilder.builder()
+                                .map()
+                                .values(
+                                        convertToSchema(
+                                                extractValueTypeToAvroMap(logicalType), rowName));
+                return nullable ? nullableSchema(map) : map;
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) logicalType;
+                Schema array =
+                        SchemaBuilder.builder()
+                                .array()
+                                .items(convertToSchema(arrayType.getElementType(), rowName));
+                return nullable ? nullableSchema(array) : array;
+            case RAW:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported to derive Schema for type: " + logicalType);
+        }
+    }
+
+    public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
+        LogicalType keyType;
+        LogicalType valueType;
+        if (type instanceof MapType) {
+            MapType mapType = (MapType) type;
+            keyType = mapType.getKeyType();
+            valueType = mapType.getValueType();
+        } else {
+            MultisetType multisetType = (MultisetType) type;
+            keyType = multisetType.getElementType();
+            valueType = new IntType();
+        }
+        if (keyType.getTypeRoot() != LogicalTypeRoot.VARCHAR
+                && keyType.getTypeRoot() != LogicalTypeRoot.CHAR) {
+            throw new UnsupportedOperationException(
+                    "Avro format doesn't support non-string as key type of map. "
+                            + "The key type is: "
+                            + keyType.asSummaryString());
+        }
+        return valueType;
+    }
+
+    /** Returns schema with nullable true. */
+    private static Schema nullableSchema(Schema schema) {
+        return schema.isNullable()
+                ? schema
+                : Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroToRowDataConverters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroToRowDataConverters.java
new file mode 100644
index 00000000..ad6816ab
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroToRowDataConverters.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.format.avro.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. * */
+@Internal
+public class AvroToRowDataConverters {
+
+    /**
+     * Runtime converter that converts Avro data structures into objects of Flink Table & SQL
+     * internal data structures.
+     */
+    @FunctionalInterface
+    public interface AvroToRowDataConverter extends Serializable {
+        Object convert(Object object);
+    }
+
+    // -------------------------------------------------------------------------------------
+    // Runtime Converters
+    // -------------------------------------------------------------------------------------
+
+    public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+        final AvroToRowDataConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(AvroToRowDataConverters::createNullableConverter)
+                        .toArray(AvroToRowDataConverter[]::new);
+        final int arity = rowType.getFieldCount();
+
+        return avroObject -> {
+            IndexedRecord record = (IndexedRecord) avroObject;
+            GenericRowData row = new GenericRowData(arity);
+            for (int i = 0; i < arity; ++i) {
+                // avro always deserialize successfully even though the type isn't matched
+                // so no need to throw exception about which field can't be deserialized
+                row.setField(i, fieldConverters[i].convert(record.get(i)));
+            }
+            return row;
+        };
+    }
+
+    /** Creates a runtime converter which is null safe. */
+    private static AvroToRowDataConverter createNullableConverter(LogicalType type) {
+        final AvroToRowDataConverter converter = createConverter(type);
+        return avroObject -> {
+            if (avroObject == null) {
+                return null;
+            }
+            return converter.convert(avroObject);
+        };
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    private static AvroToRowDataConverter createConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return avroObject -> null;
+            case TINYINT:
+                return avroObject -> ((Integer) avroObject).byteValue();
+            case SMALLINT:
+                return avroObject -> ((Integer) avroObject).shortValue();
+            case BOOLEAN: // boolean
+            case INTEGER: // int
+            case INTERVAL_YEAR_MONTH: // long
+            case BIGINT: // long
+            case INTERVAL_DAY_TIME: // long
+            case FLOAT: // float
+            case DOUBLE: // double
+                return avroObject -> avroObject;
+            case DATE:
+                return AvroToRowDataConverters::convertToDate;
+            case TIME_WITHOUT_TIME_ZONE:
+                return AvroToRowDataConverters::convertToTime;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return AvroToRowDataConverters::convertToTimestamp;
+            case CHAR:
+            case VARCHAR:
+                return avroObject -> StringData.fromString(avroObject.toString());
+            case BINARY:
+            case VARBINARY:
+                return AvroToRowDataConverters::convertToBytes;
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case ROW:
+                return createRowConverter((RowType) type);
+            case MAP:
+            case MULTISET:
+                return createMapConverter(type);
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private static AvroToRowDataConverter createDecimalConverter(DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return avroObject -> {
+            final byte[] bytes;
+            if (avroObject instanceof GenericFixed) {
+                bytes = ((GenericFixed) avroObject).bytes();
+            } else if (avroObject instanceof ByteBuffer) {
+                ByteBuffer byteBuffer = (ByteBuffer) avroObject;
+                bytes = new byte[byteBuffer.remaining()];
+                byteBuffer.get(bytes);
+            } else {
+                bytes = (byte[]) avroObject;
+            }
+            return DecimalData.fromUnscaledBytes(bytes, precision, scale);
+        };
+    }
+
+    private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) {
+        final AvroToRowDataConverter elementConverter =
+                createNullableConverter(arrayType.getElementType());
+        final Class<?> elementClass =
+                LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+
+        return avroObject -> {
+            final List<?> list = (List<?>) avroObject;
+            final int length = list.size();
+            final Object[] array = (Object[]) Array.newInstance(elementClass, length);
+            for (int i = 0; i < length; ++i) {
+                array[i] = elementConverter.convert(list.get(i));
+            }
+            return new GenericArrayData(array);
+        };
+    }
+
+    private static AvroToRowDataConverter createMapConverter(LogicalType type) {
+        final AvroToRowDataConverter keyConverter =
+                createConverter(DataTypes.STRING().getLogicalType());
+        final AvroToRowDataConverter valueConverter =
+                createNullableConverter(extractValueTypeToAvroMap(type));
+
+        return avroObject -> {
+            final Map<?, ?> map = (Map<?, ?>) avroObject;
+            Map<Object, Object> result = new HashMap<>();
+            for (Map.Entry<?, ?> entry : map.entrySet()) {
+                Object key = keyConverter.convert(entry.getKey());
+                Object value = valueConverter.convert(entry.getValue());
+                result.put(key, value);
+            }
+            return new GenericMapData(result);
+        };
+    }
+
+    private static TimestampData convertToTimestamp(Object object) {
+        final long millis;
+        if (object instanceof Long) {
+            millis = (Long) object;
+        } else if (object instanceof Instant) {
+            millis = ((Instant) object).toEpochMilli();
+        } else {
+            JodaConverter jodaConverter = JodaConverter.getConverter();
+            if (jodaConverter != null) {
+                millis = jodaConverter.convertTimestamp(object);
+            } else {
+                throw new IllegalArgumentException(
+                        "Unexpected object type for TIMESTAMP logical type. Received: " + object);
+            }
+        }
+        return TimestampData.fromEpochMillis(millis);
+    }
+
+    private static int convertToDate(Object object) {
+        if (object instanceof Integer) {
+            return (Integer) object;
+        } else if (object instanceof LocalDate) {
+            return (int) ((LocalDate) object).toEpochDay();
+        } else {
+            JodaConverter jodaConverter = JodaConverter.getConverter();
+            if (jodaConverter != null) {
+                return (int) jodaConverter.convertDate(object);
+            } else {
+                throw new IllegalArgumentException(
+                        "Unexpected object type for DATE logical type. Received: " + object);
+            }
+        }
+    }
+
+    private static int convertToTime(Object object) {
+        final int millis;
+        if (object instanceof Integer) {
+            millis = (Integer) object;
+        } else if (object instanceof LocalTime) {
+            millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
+        } else {
+            JodaConverter jodaConverter = JodaConverter.getConverter();
+            if (jodaConverter != null) {
+                millis = jodaConverter.convertTime(object);
+            } else {
+                throw new IllegalArgumentException(
+                        "Unexpected object type for TIME logical type. Received: " + object);
+            }
+        }
+        return millis;
+    }
+
+    private static byte[] convertToBytes(Object object) {
+        if (object instanceof GenericFixed) {
+            return ((GenericFixed) object).bytes();
+        } else if (object instanceof ByteBuffer) {
+            ByteBuffer byteBuffer = (ByteBuffer) object;
+            byte[] bytes = new byte[byteBuffer.remaining()];
+            byteBuffer.get(bytes);
+            return bytes;
+        } else {
+            return (byte[]) object;
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroWriterFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroWriterFactory.java
new file mode 100644
index 00000000..508e0a02
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroWriterFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import org.apache.avro.file.DataFileWriter;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates an {@link AvroBulkWriter}. The factory takes a user-supplied builder to
+ * assemble Parquet's writer and then turns it into a Flink {@code BulkWriter}.
+ *
+ * @param <T> The type of record to write.
+ */
+public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
+    private static final long serialVersionUID = 1L;
+
+    /** The builder to construct the Avro {@link DataFileWriter}. */
+    private final AvroBuilder<T> avroBuilder;
+
+    /** Creates a new AvroWriterFactory using the given builder to assemble the ParquetWriter. */
+    public AvroWriterFactory(AvroBuilder<T> avroBuilder) {
+        this.avroBuilder = avroBuilder;
+    }
+
+    @Override
+    public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
+        return new AvroBulkWriter<>(avroBuilder.createWriter(new CloseShieldOutputStream(out)));
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/CloseShieldOutputStream.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/CloseShieldOutputStream.java
new file mode 100644
index 00000000..d29970f2
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/CloseShieldOutputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** A proxy output stream that prevents the underlying output stream from being closed. */
+public class CloseShieldOutputStream extends OutputStream {
+    private final OutputStream out;
+
+    public CloseShieldOutputStream(OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] buffer) throws IOException {
+        out.write(buffer);
+    }
+
+    @Override
+    public void write(byte[] buffer, int off, int len) throws IOException {
+        out.write(buffer, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Do not actually close the internal stream.
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/FSDataInputStreamWrapper.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/FSDataInputStreamWrapper.java
new file mode 100644
index 00000000..0b1893e4
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/FSDataInputStreamWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.apache.avro.file.SeekableInput;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well).
+ *
+ * <p>The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+
+    private final FSDataInputStream stream;
+    private final long len;
+
+    public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+        this.stream = stream;
+        this.len = len;
+    }
+
+    @Override
+    public long length() throws IOException {
+        return this.len;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        return stream.read(b, off, len);
+    }
+
+    @Override
+    public void seek(long p) throws IOException {
+        stream.seek(p);
+    }
+
+    @Override
+    public long tell() throws IOException {
+        return stream.getPos();
+    }
+
+    @Override
+    public void close() throws IOException {
+        stream.close();
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/JodaConverter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/JodaConverter.java
new file mode 100644
index 00000000..4a333d52
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/JodaConverter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda is available on the
+ * classpath.
+ */
+class JodaConverter {
+
+    private static JodaConverter instance;
+    private static boolean instantiated = false;
+
+    public static JodaConverter getConverter() {
+        if (instantiated) {
+            return instance;
+        }
+
+        try {
+            Class.forName(
+                    "org.joda.time.DateTime",
+                    false,
+                    Thread.currentThread().getContextClassLoader());
+            instance = new JodaConverter();
+        } catch (ClassNotFoundException e) {
+            instance = null;
+        } finally {
+            instantiated = true;
+        }
+        return instance;
+    }
+
+    public long convertDate(Object object) {
+        final LocalDate value = (LocalDate) object;
+        return value.toDate().getTime();
+    }
+
+    public int convertTime(Object object) {
+        final LocalTime value = (LocalTime) object;
+        return value.get(DateTimeFieldType.millisOfDay());
+    }
+
+    public long convertTimestamp(Object object) {
+        final DateTime value = (DateTime) object;
+        return value.toDate().getTime();
+    }
+
+    private JodaConverter() {}
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/RowDataToAvroConverters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/RowDataToAvroConverters.java
new file mode 100644
index 00000000..cebc3091
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/RowDataToAvroConverters.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.format.avro.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/** Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}. */
+@Internal
+public class RowDataToAvroConverters {
+
+    // --------------------------------------------------------------------------------
+    // Runtime Converters
+    // --------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts objects of Flink Table & SQL internal data structures to
+     * corresponding Avro data structures.
+     */
+    @FunctionalInterface
+    public interface RowDataToAvroConverter extends Serializable {
+        Object convert(Schema schema, Object object);
+    }
+
+    // --------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for
+    // sql-client uber jars.
+    // --------------------------------------------------------------------------------
+
+    /**
+     * Creates a runtime converter according to the given logical type that converts objects of
+     * Flink Table & SQL internal data structures to corresponding Avro data structures.
+     */
+    public static RowDataToAvroConverter createConverter(LogicalType type) {
+        final RowDataToAvroConverter converter;
+        switch (type.getTypeRoot()) {
+            case NULL:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return null;
+                            }
+                        };
+                break;
+            case TINYINT:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return ((Byte) object).intValue();
+                            }
+                        };
+                break;
+            case SMALLINT:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return ((Short) object).intValue();
+                            }
+                        };
+                break;
+            case BOOLEAN: // boolean
+            case INTEGER: // int
+            case INTERVAL_YEAR_MONTH: // long
+            case BIGINT: // long
+            case INTERVAL_DAY_TIME: // long
+            case FLOAT: // float
+            case DOUBLE: // double
+            case TIME_WITHOUT_TIME_ZONE: // int
+            case DATE: // int
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return object;
+                            }
+                        };
+                break;
+            case CHAR:
+            case VARCHAR:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return new Utf8(object.toString());
+                            }
+                        };
+                break;
+            case BINARY:
+            case VARBINARY:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return ByteBuffer.wrap((byte[]) object);
+                            }
+                        };
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return ((TimestampData) object).toInstant().toEpochMilli();
+                            }
+                        };
+                break;
+            case DECIMAL:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object object) {
+                                return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+                            }
+                        };
+                break;
+            case ARRAY:
+                converter = createArrayConverter((ArrayType) type);
+                break;
+            case ROW:
+                converter = createRowConverter((RowType) type);
+                break;
+            case MAP:
+            case MULTISET:
+                converter = createMapConverter(type);
+                break;
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+
+        // wrap into nullable converter
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                if (object == null) {
+                    return null;
+                }
+
+                // get actual schema if it is a nullable schema
+                Schema actualSchema;
+                if (schema.getType() == Schema.Type.UNION) {
+                    List<Schema> types = schema.getTypes();
+                    int size = types.size();
+                    if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+                        actualSchema = types.get(0);
+                    } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+                        actualSchema = types.get(1);
+                    } else {
+                        throw new IllegalArgumentException(
+                                "The Avro schema is not a nullable type: " + schema);
+                    }
+                } else {
+                    actualSchema = schema;
+                }
+                return converter.convert(actualSchema, object);
+            }
+        };
+    }
+
+    private static RowDataToAvroConverter createRowConverter(RowType rowType) {
+        final RowDataToAvroConverter[] fieldConverters =
+                rowType.getChildren().stream()
+                        .map(RowDataToAvroConverters::createConverter)
+                        .toArray(RowDataToAvroConverter[]::new);
+        final LogicalType[] fieldTypes =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .toArray(LogicalType[]::new);
+        final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
+        for (int i = 0; i < fieldTypes.length; i++) {
+            fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+        }
+        final int length = rowType.getFieldCount();
+
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                final RowData row = (RowData) object;
+                final List<Schema.Field> fields = schema.getFields();
+                final GenericRecord record = new GenericData.Record(schema);
+                for (int i = 0; i < length; ++i) {
+                    final Schema.Field schemaField = fields.get(i);
+                    try {
+                        Object avroObject =
+                                fieldConverters[i].convert(
+                                        schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
+                        record.put(i, avroObject);
+                    } catch (Throwable t) {
+                        throw new RuntimeException(
+                                String.format(
+                                        "Fail to serialize at field: %s.", schemaField.name()),
+                                t);
+                    }
+                }
+                return record;
+            }
+        };
+    }
+
+    private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) {
+        LogicalType elementType = arrayType.getElementType();
+        final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
+        final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType());
+
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                final Schema elementSchema = schema.getElementType();
+                ArrayData arrayData = (ArrayData) object;
+                List<Object> list = new ArrayList<>();
+                for (int i = 0; i < arrayData.size(); ++i) {
+                    list.add(
+                            elementConverter.convert(
+                                    elementSchema, elementGetter.getElementOrNull(arrayData, i)));
+                }
+                return list;
+            }
+        };
+    }
+
+    private static RowDataToAvroConverter createMapConverter(LogicalType type) {
+        LogicalType valueType = extractValueTypeToAvroMap(type);
+        final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
+        final RowDataToAvroConverter valueConverter = createConverter(valueType);
+
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                final Schema valueSchema = schema.getValueType();
+                final MapData mapData = (MapData) object;
+                final ArrayData keyArray = mapData.keyArray();
+                final ArrayData valueArray = mapData.valueArray();
+                final Map<Object, Object> map = new HashMap<>(mapData.size());
+                for (int i = 0; i < mapData.size(); ++i) {
+                    final String key = keyArray.getString(i).toString();
+                    final Object value =
+                            valueConverter.convert(
+                                    valueSchema, valueGetter.getElementOrNull(valueArray, i));
+                    map.put(key, value);
+                }
+                return map;
+            }
+        };
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 2c68889f..bf593a56 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -24,17 +24,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.orc.OrcSplitReaderUtil;
-import org.apache.flink.orc.vector.RowDataVectorizer;
-import org.apache.flink.orc.vector.Vectorizer;
-import org.apache.flink.orc.writer.OrcBulkWriterFactory;
-import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.format.orc.filter.OrcFileStatsExtractor;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.store.format.orc.filter.OrcPredicateFunctionVisitor;
+import org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil;
+import org.apache.flink.table.store.format.orc.writer.RowDataVectorizer;
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.IntType;
@@ -53,17 +53,17 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
-
 /** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */
 public class OrcFileFormat extends FileFormat {
 
+    public static final String IDENTIFIER = "orc";
+
     private final Properties orcProperties;
     private final org.apache.hadoop.conf.Configuration readerConf;
     private final org.apache.hadoop.conf.Configuration writerConf;
 
     public OrcFileFormat(Configuration formatOptions) {
-        super(org.apache.flink.orc.OrcFileFormatFactory.IDENTIFIER);
+        super(IDENTIFIER);
         this.orcProperties = getOrcProperties(formatOptions);
         this.readerConf = new org.apache.hadoop.conf.Configuration();
         this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString()));
@@ -93,19 +93,20 @@ public class OrcFileFormat extends FileFormat {
             }
         }
 
-        return OrcInputFormatFactory.create(
+        return new OrcReaderFactory(
                 readerConf,
                 (RowType) refineLogicalType(type),
                 Projection.of(projection).toTopLevelIndexes(),
-                orcPredicates);
+                orcPredicates,
+                2048);
     }
 
     /**
-     * The {@link OrcBulkWriterFactory} will create {@link ThreadLocalClassLoaderConfiguration} from
-     * the input writer config to avoid classloader leaks.
+     * The {@link OrcWriterFactory} will create {@link ThreadLocalClassLoaderConfiguration} from the
+     * input writer config to avoid classloader leaks.
      *
-     * <p>TODO: The {@link ThreadLocalClassLoaderConfiguration} in {@link OrcBulkWriterFactory}
-     * should be removed after https://issues.apache.org/jira/browse/ORC-653 is fixed.
+     * <p>TODO: The {@link ThreadLocalClassLoaderConfiguration} in {@link OrcWriterFactory} should
+     * be removed after https://issues.apache.org/jira/browse/ORC-653 is fixed.
      *
      * @param type The data type for the {@link BulkWriter}
      * @return The factory of the {@link BulkWriter}
@@ -119,7 +120,7 @@ public class OrcFileFormat extends FileFormat {
         Vectorizer<RowData> vectorizer =
                 new RowDataVectorizer(typeDescription.toString(), orcTypes);
 
-        return new OrcBulkWriterFactory<>(vectorizer, orcProperties, writerConf);
+        return new OrcWriterFactory<>(vectorizer, orcProperties, writerConf);
     }
 
     private static Properties getOrcProperties(ReadableConfig options) {
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
deleted file mode 100644
index e5e5e3ec..00000000
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.format.orc;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.utils.ReflectionUtils;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-
-/** Factory to create orc input format for different Flink versions. */
-public class OrcInputFormatFactory {
-
-    public static BulkFormat<RowData, FileSourceSplit> create(
-            Configuration conf,
-            RowType type,
-            int[] projection,
-            List<OrcFilters.Predicate> orcPredicates) {
-        try {
-            return createFrom115(conf, type, projection, orcPredicates);
-        } catch (ClassNotFoundException e) {
-            try {
-                return createFrom114(conf, type, projection, orcPredicates);
-            } catch (ClassNotFoundException ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-
-    private static BulkFormat<RowData, FileSourceSplit> createFrom115(
-            Configuration conf,
-            RowType type,
-            int[] projection,
-            List<OrcFilters.Predicate> orcPredicates)
-            throws ClassNotFoundException {
-        Class<?> formatClass = Class.forName("org.apache.flink.orc.OrcColumnarRowInputFormat");
-        try {
-            return ReflectionUtils.invokeStaticMethod(
-                    formatClass,
-                    "createPartitionedFormat",
-                    new OrcShimImpl(),
-                    conf,
-                    type,
-                    Collections.emptyList(),
-                    null,
-                    projection,
-                    orcPredicates,
-                    2048,
-                    (Function<RowType, TypeInformation<RowData>>) InternalTypeInfo::of);
-        } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static BulkFormat<RowData, FileSourceSplit> createFrom114(
-            Configuration conf,
-            RowType type,
-            int[] projection,
-            List<OrcFilters.Predicate> orcPredicates)
-            throws ClassNotFoundException {
-        Class<?> formatClass = Class.forName("org.apache.flink.orc.OrcColumnarRowFileInputFormat");
-        try {
-            return ReflectionUtils.invokeStaticMethod(
-                    formatClass,
-                    "createPartitionedFormat",
-                    new OrcShimImpl(),
-                    conf,
-                    type,
-                    Collections.emptyList(),
-                    null,
-                    projection,
-                    orcPredicates,
-                    2048);
-        } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcReaderFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcReaderFactory.java
new file mode 100644
index 00000000..5ce6a835
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcReaderFactory.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarRowData;
+import org.apache.flink.table.store.data.columnar.ColumnarRowIterator;
+import org.apache.flink.table.store.data.columnar.VectorizedColumnBatch;
+import org.apache.flink.table.store.format.fs.HadoopReadOnlyFileSystem;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.table.store.format.orc.reader.AbstractOrcColumnVector.createFlinkVector;
+import static org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil.logicalTypeToOrcType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An ORC reader that produces a stream of {@link ColumnarRowData} records. */
+public class OrcReaderFactory implements BulkFormat<RowData, FileSourceSplit> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final SerializableHadoopConfigWrapper hadoopConfigWrapper;
+
+    protected final TypeDescription schema;
+
+    private final RowType tableType;
+
+    protected final int[] selectedFields;
+
+    protected final List<OrcFilters.Predicate> conjunctPredicates;
+
+    protected final int batchSize;
+
+    /**
+     * @param hadoopConfig the hadoop config for orc reader.
+     * @param selectedFields the read selected field of orc format.
+     * @param conjunctPredicates the filter predicates that can be evaluated.
+     * @param batchSize the batch size of orc reader.
+     */
+    public OrcReaderFactory(
+            final org.apache.hadoop.conf.Configuration hadoopConfig,
+            final RowType tableType,
+            final int[] selectedFields,
+            final List<OrcFilters.Predicate> conjunctPredicates,
+            final int batchSize) {
+        this.hadoopConfigWrapper = new SerializableHadoopConfigWrapper(checkNotNull(hadoopConfig));
+        this.schema = logicalTypeToOrcType(tableType);
+        this.tableType = tableType;
+        this.selectedFields = checkNotNull(selectedFields);
+        this.conjunctPredicates = checkNotNull(conjunctPredicates);
+        this.batchSize = batchSize;
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    public OrcVectorizedReader createReader(
+            final org.apache.flink.configuration.Configuration config, final FileSourceSplit split)
+            throws IOException {
+
+        final int numBatchesToCirculate =
+                config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
+        final Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(numBatchesToCirculate);
+
+        final RecordReader orcReader =
+                createRecordReader(
+                        hadoopConfigWrapper.getHadoopConfig(),
+                        schema,
+                        selectedFields,
+                        conjunctPredicates,
+                        split.path(),
+                        split.offset(),
+                        split.length());
+
+        return new OrcVectorizedReader(orcReader, poolOfBatches);
+    }
+
+    @Override
+    public OrcVectorizedReader restoreReader(
+            final org.apache.flink.configuration.Configuration config,
+            final FileSourceSplit split) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isSplittable() {
+        return true;
+    }
+
+    /**
+     * Creates the {@link OrcReaderBatch} structure, which is responsible for holding the data
+     * structures that hold the batch data (column vectors, row arrays, ...) and the batch
+     * conversion from the ORC representation to the result format.
+     */
+    public OrcReaderBatch createReaderBatch(
+            VectorizedRowBatch orcBatch, Pool.Recycler<OrcReaderBatch> recycler) {
+        List<String> tableFieldNames = tableType.getFieldNames();
+        List<LogicalType> tableFieldTypes = tableType.getChildren();
+
+        // create and initialize the row batch
+        ColumnVector[] vectors = new ColumnVector[selectedFields.length];
+        for (int i = 0; i < vectors.length; i++) {
+            String name = tableFieldNames.get(selectedFields[i]);
+            LogicalType type = tableFieldTypes.get(selectedFields[i]);
+            vectors[i] = createFlinkVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
+        }
+        VectorizedColumnBatch flinkColumnBatch = new VectorizedColumnBatch(vectors);
+        return new OrcReaderBatch(orcBatch, flinkColumnBatch, recycler);
+    }
+
+    /** Gets the type produced by this format. */
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        throw new UnsupportedOperationException();
+    }
+
+    // ------------------------------------------------------------------------
+
+    private Pool<OrcReaderBatch> createPoolOfBatches(int numBatches) {
+        final Pool<OrcReaderBatch> pool = new Pool<>(numBatches);
+
+        for (int i = 0; i < numBatches; i++) {
+            final VectorizedRowBatch orcBatch = createBatchWrapper(schema, batchSize);
+            final OrcReaderBatch batch = createReaderBatch(orcBatch, pool.recycler());
+            pool.add(batch);
+        }
+
+        return pool;
+    }
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * The {@code OrcReaderBatch} class holds the data structures containing the batch data (column
+     * vectors, row arrays, ...) and performs the batch conversion from the ORC representation to
+     * the result format.
+     *
+     * <p>This base class only holds the ORC Column Vectors, subclasses hold additionally the result
+     * structures and implement the conversion in {@link
+     * OrcReaderBatch#convertAndGetIterator(VectorizedRowBatch, long)}.
+     */
+    protected static class OrcReaderBatch {
+
+        private final VectorizedRowBatch orcVectorizedRowBatch;
+        private final Pool.Recycler<OrcReaderBatch> recycler;
+
+        private final VectorizedColumnBatch flinkColumnBatch;
+        private final ColumnarRowIterator result;
+
+        protected OrcReaderBatch(
+                final VectorizedRowBatch orcVectorizedRowBatch,
+                final VectorizedColumnBatch flinkColumnBatch,
+                final Pool.Recycler<OrcReaderBatch> recycler) {
+            this.orcVectorizedRowBatch = checkNotNull(orcVectorizedRowBatch);
+            this.recycler = checkNotNull(recycler);
+            this.flinkColumnBatch = flinkColumnBatch;
+            this.result =
+                    new ColumnarRowIterator(new ColumnarRowData(flinkColumnBatch), this::recycle);
+        }
+
+        /**
+         * Puts this batch back into the pool. This should be called after all records from the
+         * batch have been returned, typically in the {@link RecordIterator#releaseBatch()} method.
+         */
+        public void recycle() {
+            recycler.recycle(this);
+        }
+
+        /** Gets the ORC VectorizedRowBatch structure from this batch. */
+        public VectorizedRowBatch orcVectorizedRowBatch() {
+            return orcVectorizedRowBatch;
+        }
+
+        /**
+         * Converts the ORC VectorizedRowBatch into the result structure and returns an iterator
+         * over the entries.
+         *
+         * <p>This method may, for example, return a single element iterator that returns the entire
+         * batch as one, or (as another example) return an iterator over the rows projected from
+         * this column batch.
+         *
+         * <p>The position information in the result needs to be constructed as follows: The value
+         * of {@code startingOffset} is the offset value ({@link RecordAndPosition#getOffset()}) for
+         * all rows in the batch. Each row then increments the records-to-skip value ({@link
+         * RecordAndPosition#getRecordSkipCount()}).
+         */
+        private RecordIterator<RowData> convertAndGetIterator(
+                final VectorizedRowBatch orcBatch, final long startingOffset) {
+            // no copying from the ORC column vectors to the Flink columns vectors necessary,
+            // because they point to the same data arrays internally design
+            int batchSize = orcBatch.size;
+            flinkColumnBatch.setNumRows(batchSize);
+            result.set(batchSize, startingOffset, 0);
+            return result;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * A vectorized ORC reader. This reader reads an ORC Batch at a time and converts it to one or
+     * more records to be returned. An ORC Row-wise reader would convert the batch into a set of
+     * rows, while a reader for a vectorized query processor might return the whole batch as one
+     * record.
+     *
+     * <p>The conversion of the {@code VectorizedRowBatch} happens in the specific {@link
+     * OrcReaderBatch} implementation.
+     *
+     * <p>The reader tracks its current position using ORC's <i>row numbers</i>. Each record in a
+     * batch is addressed by the starting row number of the batch, plus the number of records to be
+     * skipped before.
+     */
+    private static final class OrcVectorizedReader implements BulkFormat.Reader<RowData> {
+
+        private final RecordReader orcReader;
+        private final Pool<OrcReaderBatch> pool;
+
+        private OrcVectorizedReader(final RecordReader orcReader, final Pool<OrcReaderBatch> pool) {
+            this.orcReader = checkNotNull(orcReader, "orcReader");
+            this.pool = checkNotNull(pool, "pool");
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<RowData> readBatch() throws IOException {
+            final OrcReaderBatch batch = getCachedEntry();
+            final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch();
+
+            final long orcRowNumber = orcReader.getRowNumber();
+            if (!nextBatch(orcReader, orcVectorBatch)) {
+                batch.recycle();
+                return null;
+            }
+
+            return batch.convertAndGetIterator(orcVectorBatch, orcRowNumber);
+        }
+
+        @Override
+        public void close() throws IOException {
+            orcReader.close();
+        }
+
+        private OrcReaderBatch getCachedEntry() throws IOException {
+            try {
+                return pool.pollEntry();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted");
+            }
+        }
+    }
+
+    private static RecordReader createRecordReader(
+            org.apache.hadoop.conf.Configuration conf,
+            TypeDescription schema,
+            int[] selectedFields,
+            List<OrcFilters.Predicate> conjunctPredicates,
+            org.apache.flink.core.fs.Path path,
+            long splitStart,
+            long splitLength)
+            throws IOException {
+        org.apache.orc.Reader orcReader = createReader(conf, path);
+
+        // get offset and length for the stripes that start in the split
+        Tuple2<Long, Long> offsetAndLength =
+                getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes());
+
+        // create ORC row reader configuration
+        org.apache.orc.Reader.Options options =
+                new org.apache.orc.Reader.Options()
+                        .schema(schema)
+                        .range(offsetAndLength.f0, offsetAndLength.f1)
+                        .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+                        .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+                        .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+        // configure filters
+        if (!conjunctPredicates.isEmpty()) {
+            SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+            b = b.startAnd();
+            for (OrcFilters.Predicate predicate : conjunctPredicates) {
+                predicate.add(b);
+            }
+            b = b.end();
+            options.searchArgument(b.build(), new String[] {});
+        }
+
+        // configure selected fields
+        options.include(computeProjectionMask(schema, selectedFields));
+
+        // create ORC row reader
+        RecordReader orcRowsReader = orcReader.rows(options);
+
+        // assign ids
+        schema.getId();
+
+        return orcRowsReader;
+    }
+
+    private static VectorizedRowBatch createBatchWrapper(TypeDescription schema, int batchSize) {
+        return schema.createRowBatch(batchSize);
+    }
+
+    private static boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch)
+            throws IOException {
+        return reader.nextBatch(rowBatch);
+    }
+
+    private static Tuple2<Long, Long> getOffsetAndLengthForSplit(
+            long splitStart, long splitLength, List<StripeInformation> stripes) {
+        long splitEnd = splitStart + splitLength;
+        long readStart = Long.MAX_VALUE;
+        long readEnd = Long.MIN_VALUE;
+
+        for (StripeInformation s : stripes) {
+            if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
+                // stripe starts in split, so it is included
+                readStart = Math.min(readStart, s.getOffset());
+                readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
+            }
+        }
+
+        if (readStart < Long.MAX_VALUE) {
+            // at least one split is included
+            return Tuple2.of(readStart, readEnd - readStart);
+        } else {
+            return Tuple2.of(0L, 0L);
+        }
+    }
+
+    /**
+     * Computes the ORC projection mask of the fields to include from the selected
+     * fields.rowOrcInputFormat.nextRecord(null).
+     *
+     * @return The ORC projection mask.
+     */
+    private static boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) {
+        // mask with all fields of the schema
+        boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
+        // for each selected field
+        for (int inIdx : selectedFields) {
+            // set all nested fields of a selected field to true
+            TypeDescription fieldSchema = schema.getChildren().get(inIdx);
+            for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
+                projectionMask[i] = true;
+            }
+        }
+        return projectionMask;
+    }
+
+    public static org.apache.orc.Reader createReader(
+            org.apache.hadoop.conf.Configuration conf, org.apache.flink.core.fs.Path path)
+            throws IOException {
+        // open ORC file and create reader
+        org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(path.toUri());
+
+        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+
+        // configure filesystem from Flink filesystem
+        readerOptions.filesystem(new HadoopReadOnlyFileSystem(path.getFileSystem()));
+
+        return OrcFile.createReader(hPath, readerOptions);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
deleted file mode 100644
index 69fd951e..00000000
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.format.orc;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.orc.shim.OrcShim;
-import org.apache.flink.orc.vector.HiveOrcBatchWrapper;
-import org.apache.flink.table.store.format.fs.HadoopReadOnlyFileSystem;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A {@link OrcShim} for table store.
- *
- * <p>This is copied from flink-orc except filesystem setting.
- */
-public class OrcShimImpl implements OrcShim<VectorizedRowBatch> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public RecordReader createRecordReader(
-            Configuration conf,
-            TypeDescription schema,
-            int[] selectedFields,
-            List<OrcFilters.Predicate> conjunctPredicates,
-            org.apache.flink.core.fs.Path path,
-            long splitStart,
-            long splitLength)
-            throws IOException {
-        Reader orcReader = createReader(conf, path);
-
-        // get offset and length for the stripes that start in the split
-        Tuple2<Long, Long> offsetAndLength =
-                getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes());
-
-        // create ORC row reader configuration
-        Reader.Options options =
-                new Reader.Options()
-                        .schema(schema)
-                        .range(offsetAndLength.f0, offsetAndLength.f1)
-                        .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
-                        .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
-                        .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
-
-        // configure filters
-        if (!conjunctPredicates.isEmpty()) {
-            SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
-            b = b.startAnd();
-            for (OrcFilters.Predicate predicate : conjunctPredicates) {
-                predicate.add(b);
-            }
-            b = b.end();
-            options.searchArgument(b.build(), new String[] {});
-        }
-
-        // configure selected fields
-        options.include(computeProjectionMask(schema, selectedFields));
-
-        // create ORC row reader
-        RecordReader orcRowsReader = orcReader.rows(options);
-
-        // assign ids
-        schema.getId();
-
-        return orcRowsReader;
-    }
-
-    @Override
-    public HiveOrcBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize) {
-        return new HiveOrcBatchWrapper(schema.createRowBatch(batchSize));
-    }
-
-    @Override
-    public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException {
-        return reader.nextBatch(rowBatch);
-    }
-
-    private static Tuple2<Long, Long> getOffsetAndLengthForSplit(
-            long splitStart, long splitLength, List<StripeInformation> stripes) {
-        long splitEnd = splitStart + splitLength;
-        long readStart = Long.MAX_VALUE;
-        long readEnd = Long.MIN_VALUE;
-
-        for (StripeInformation s : stripes) {
-            if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
-                // stripe starts in split, so it is included
-                readStart = Math.min(readStart, s.getOffset());
-                readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
-            }
-        }
-
-        if (readStart < Long.MAX_VALUE) {
-            // at least one split is included
-            return Tuple2.of(readStart, readEnd - readStart);
-        } else {
-            return Tuple2.of(0L, 0L);
-        }
-    }
-
-    /**
-     * Computes the ORC projection mask of the fields to include from the selected
-     * fields.rowOrcInputFormat.nextRecord(null).
-     *
-     * @return The ORC projection mask.
-     */
-    private static boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) {
-        // mask with all fields of the schema
-        boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
-        // for each selected field
-        for (int inIdx : selectedFields) {
-            // set all nested fields of a selected field to true
-            TypeDescription fieldSchema = schema.getChildren().get(inIdx);
-            for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
-                projectionMask[i] = true;
-            }
-        }
-        return projectionMask;
-    }
-
-    public static Reader createReader(Configuration conf, org.apache.flink.core.fs.Path path)
-            throws IOException {
-        // open ORC file and create reader
-        Path hPath = new Path(path.toUri());
-
-        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
-
-        // configure filesystem from Flink filesystem
-        readerOptions.filesystem(new HadoopReadOnlyFileSystem(path.getFileSystem()));
-
-        return OrcFile.createReader(hPath, readerOptions);
-    }
-}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcWriterFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcWriterFactory.java
new file mode 100644
index 00000000..34960bd9
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcWriterFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.table.store.format.orc.writer.OrcBulkWriter;
+import org.apache.flink.table.store.format.orc.writer.PhysicalWriterImpl;
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.OrcFile;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory that creates an ORC {@link BulkWriter}. The factory takes a user supplied {@link
+ * Vectorizer} implementation to convert the element into an {@link
+ * org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}.
+ *
+ * @param <T> The type of element to write.
+ */
+@PublicEvolving
+public class OrcWriterFactory<T> implements BulkWriter.Factory<T> {
+
+    private final Vectorizer<T> vectorizer;
+    private final Properties writerProperties;
+    private final Map<String, String> confMap;
+
+    private OrcFile.WriterOptions writerOptions;
+
+    /**
+     * Creates a new OrcBulkWriterFactory using the provided Vectorizer implementation.
+     *
+     * @param vectorizer The vectorizer implementation to convert input record to a
+     *     VectorizerRowBatch.
+     */
+    public OrcWriterFactory(Vectorizer<T> vectorizer) {
+        this(vectorizer, new Configuration());
+    }
+
+    /**
+     * Creates a new OrcBulkWriterFactory using the provided Vectorizer, Hadoop Configuration.
+     *
+     * @param vectorizer The vectorizer implementation to convert input record to a
+     *     VectorizerRowBatch.
+     */
+    public OrcWriterFactory(Vectorizer<T> vectorizer, Configuration configuration) {
+        this(vectorizer, null, configuration);
+    }
+
+    /**
+     * Creates a new OrcBulkWriterFactory using the provided Vectorizer, Hadoop Configuration, ORC
+     * writer properties.
+     *
+     * @param vectorizer The vectorizer implementation to convert input record to a
+     *     VectorizerRowBatch.
+     * @param writerProperties Properties that can be used in ORC WriterOptions.
+     */
+    public OrcWriterFactory(
+            Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration) {
+        this.vectorizer = checkNotNull(vectorizer);
+        this.writerProperties = writerProperties;
+        this.confMap = new HashMap<>();
+
+        // Todo: Replace the Map based approach with a better approach
+        for (Map.Entry<String, String> entry : configuration) {
+            confMap.put(entry.getKey(), entry.getValue());
+        }
+    }
+
+    @Override
+    public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
+        OrcFile.WriterOptions opts = getWriterOptions();
+        opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+
+        // The path of the Writer is not used to indicate the destination file
+        // in this case since we have used a dedicated physical writer to write
+        // to the give output stream directly. However, the path would be used as
+        // the key of writer in the ORC memory manager, thus we need to make it unique.
+        Path unusedPath = new Path(UUID.randomUUID().toString());
+        return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts));
+    }
+
+    @VisibleForTesting
+    protected OrcFile.WriterOptions getWriterOptions() {
+        if (null == writerOptions) {
+            Configuration conf = new ThreadLocalClassLoaderConfiguration();
+            for (Map.Entry<String, String> entry : confMap.entrySet()) {
+                conf.set(entry.getKey(), entry.getValue());
+            }
+
+            writerOptions = OrcFile.writerOptions(writerProperties, conf);
+            writerOptions.setSchema(this.vectorizer.getSchema());
+        }
+
+        return writerOptions;
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/SerializableHadoopConfigWrapper.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/SerializableHadoopConfigWrapper.java
new file mode 100644
index 00000000..4a770ba0
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/SerializableHadoopConfigWrapper.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utility class to make a {@link Configuration Hadoop Configuration} serializable. */
+public final class SerializableHadoopConfigWrapper implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient Configuration hadoopConfig;
+
+    public SerializableHadoopConfigWrapper(Configuration hadoopConfig) {
+        this.hadoopConfig = checkNotNull(hadoopConfig);
+    }
+
+    public Configuration getHadoopConfig() {
+        return hadoopConfig;
+    }
+
+    // ------------------------------------------------------------------------
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+
+        // we write the Hadoop config through a separate serializer to avoid cryptic exceptions when
+        // it
+        // corrupts the serialization stream
+        final DataOutputSerializer ser = new DataOutputSerializer(256);
+        hadoopConfig.write(ser);
+        out.writeInt(ser.length());
+        out.write(ser.getSharedBuffer(), 0, ser.length());
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+
+        final byte[] data = new byte[in.readInt()];
+        in.readFully(data);
+        final DataInputDeserializer deser = new DataInputDeserializer(data);
+        this.hadoopConfig = new Configuration();
+
+        try {
+            this.hadoopConfig.readFields(deser);
+        } catch (IOException e) {
+            throw new IOException(
+                    "Could not deserialize Hadoop Configuration, the serialized and de-serialized don't match.",
+                    e);
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/ThreadLocalClassLoaderConfiguration.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/ThreadLocalClassLoaderConfiguration.java
new file mode 100644
index 00000000..033f4edd
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/ThreadLocalClassLoaderConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URL;
+
+/**
+ * Workaround for https://issues.apache.org/jira/browse/ORC-653.
+ *
+ * <p>Since the conf is effectively cached across Flink jobs, at least force the thread local
+ * classloader to avoid classloader leaks.
+ */
+@Internal
+public final class ThreadLocalClassLoaderConfiguration extends Configuration {
+    public ThreadLocalClassLoaderConfiguration() {}
+
+    public ThreadLocalClassLoaderConfiguration(Configuration other) {
+        super(other);
+    }
+
+    @Override
+    public ClassLoader getClassLoader() {
+        return Thread.currentThread().getContextClassLoader();
+    }
+
+    @Override
+    public Class<?> getClassByNameOrNull(String name) {
+        try {
+            return Class.forName(name, true, getClassLoader());
+        } catch (ClassNotFoundException e) {
+            return null;
+        }
+    }
+
+    @Override
+    public URL getResource(String name) {
+        return getClassLoader().getResource(name);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFileStatsExtractor.java
similarity index 97%
rename from flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFileStatsExtractor.java
index 2cd7ac20..36dc0fa5 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFileStatsExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.format.orc;
+package org.apache.flink.table.store.format.orc.filter;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.DecimalData;
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.format.orc.OrcReaderFactory;
 import org.apache.flink.table.store.utils.DateTimeUtils;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -57,7 +58,7 @@ public class OrcFileStatsExtractor implements FileStatsExtractor {
 
     @Override
     public FieldStats[] extract(Path path) throws IOException {
-        try (Reader reader = OrcShimImpl.createReader(new Configuration(), path)) {
+        try (Reader reader = OrcReaderFactory.createReader(new Configuration(), path)) {
             long rowCount = reader.getNumberOfRows();
             ColumnStatistics[] columnStatistics = reader.getStatistics();
             TypeDescription schema = reader.getSchema();
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
new file mode 100644
index 00000000..8863c316
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.filter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.store.utils.MapBuilder;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+/** Utility class that provides helper methods to work with Orc Filter PushDown. */
+public class OrcFilters {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OrcFilters.class);
+
+    private static final Map<FunctionDefinition, Function<CallExpression, Predicate>> FILTERS =
+            new MapBuilder<FunctionDefinition, Function<CallExpression, Predicate>>()
+                    .put(BuiltInFunctionDefinitions.IS_NULL, OrcFilters::convertIsNull)
+                    .put(BuiltInFunctionDefinitions.IS_NOT_NULL, OrcFilters::convertIsNotNull)
+                    .put(BuiltInFunctionDefinitions.NOT, OrcFilters::convertNot)
+                    .put(BuiltInFunctionDefinitions.OR, OrcFilters::convertOr)
+                    .put(
+                            BuiltInFunctionDefinitions.EQUALS,
+                            call ->
+                                    convertBinary(
+                                            call,
+                                            OrcFilters::convertEquals,
+                                            OrcFilters::convertEquals))
+                    .put(
+                            BuiltInFunctionDefinitions.NOT_EQUALS,
+                            call ->
+                                    convertBinary(
+                                            call,
+                                            OrcFilters::convertNotEquals,
+                                            OrcFilters::convertNotEquals))
+                    .put(
+                            BuiltInFunctionDefinitions.GREATER_THAN,
+                            call ->
+                                    convertBinary(
+                                            call,
+                                            OrcFilters::convertGreaterThan,
+                                            OrcFilters::convertLessThanEquals))
+                    .put(
+                            BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+                            call ->
+                                    convertBinary(
+                                            call,
+                                            OrcFilters::convertGreaterThanEquals,
+                                            OrcFilters::convertLessThan))
+                    .put(
+                            BuiltInFunctionDefinitions.LESS_THAN,
+                            call ->
+                                    convertBinary(
+                                            call,
+                                            OrcFilters::convertLessThan,
+                                            OrcFilters::convertGreaterThanEquals))
+                    .put(
+                            BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+                            call ->
+                                    convertBinary(
+                                            call,
+                                            OrcFilters::convertLessThanEquals,
+                                            OrcFilters::convertGreaterThan))
+                    .unmodifiable();
+
+    private static boolean isRef(Expression expression) {
+        return expression instanceof FieldReferenceExpression;
+    }
+
+    private static boolean isLit(Expression expression) {
+        return expression instanceof ValueLiteralExpression;
+    }
+
+    private static boolean isUnaryValid(CallExpression callExpression) {
+        return callExpression.getChildren().size() == 1
+                && isRef(callExpression.getChildren().get(0));
+    }
+
+    private static boolean isBinaryValid(CallExpression callExpression) {
+        return callExpression.getChildren().size() == 2
+                && (isRef(callExpression.getChildren().get(0))
+                                && isLit(callExpression.getChildren().get(1))
+                        || isLit(callExpression.getChildren().get(0))
+                                && isRef(callExpression.getChildren().get(1)));
+    }
+
+    private static Predicate convertIsNull(CallExpression callExp) {
+        if (!isUnaryValid(callExp)) {
+            // not a valid predicate
+            LOG.debug(
+                    "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+                    callExp);
+            return null;
+        }
+
+        PredicateLeaf.Type colType =
+                toOrcType(
+                        ((FieldReferenceExpression) callExp.getChildren().get(0))
+                                .getOutputDataType());
+        if (colType == null) {
+            // unsupported type
+            LOG.debug(
+                    "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+                    callExp);
+            return null;
+        }
+
+        String colName = getColumnName(callExp);
+
+        return new IsNull(colName, colType);
+    }
+
+    private static Predicate convertIsNotNull(CallExpression callExp) {
+        return new Not(convertIsNull(callExp));
+    }
+
+    private static Predicate convertNot(CallExpression callExp) {
+        if (callExp.getChildren().size() != 1) {
+            // not a valid predicate
+            LOG.debug(
+                    "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+                    callExp);
+            return null;
+        }
+
+        Predicate c = toOrcPredicate(callExp.getChildren().get(0));
+        return c == null ? null : new Not(c);
+    }
+
+    private static Predicate convertOr(CallExpression callExp) {
+        if (callExp.getChildren().size() < 2) {
+            return null;
+        }
+        Expression left = callExp.getChildren().get(0);
+        Expression right = callExp.getChildren().get(1);
+
+        Predicate c1 = toOrcPredicate(left);
+        Predicate c2 = toOrcPredicate(right);
+        if (c1 == null || c2 == null) {
+            return null;
+        } else {
+            return new Or(c1, c2);
+        }
+    }
+
+    public static Predicate convertBinary(
+            CallExpression callExp,
+            TriFunction<String, PredicateLeaf.Type, Serializable, Predicate> func,
+            TriFunction<String, PredicateLeaf.Type, Serializable, Predicate> reverseFunc) {
+        if (!isBinaryValid(callExp)) {
+            // not a valid predicate
+            LOG.debug(
+                    "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+                    callExp);
+            return null;
+        }
+
+        PredicateLeaf.Type litType = getLiteralType(callExp);
+        if (litType == null) {
+            // unsupported literal type
+            LOG.debug(
+                    "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+                    callExp);
+            return null;
+        }
+
+        String colName = getColumnName(callExp);
+
+        // fetch literal and ensure it is serializable
+        Object literalObj = getLiteral(callExp).get();
+        Object orcObj = toOrcObject(litType, literalObj);
+        Serializable literal;
+        // validate that literal is serializable
+        if (orcObj instanceof Serializable) {
+            literal = (Serializable) orcObj;
+        } else {
+            LOG.warn(
+                    "Encountered a non-serializable literal of type {}. "
+                            + "Cannot push predicate [{}] into OrcFileSystemFormatFactory. "
+                            + "This is a bug and should be reported.",
+                    literalObj.getClass().getCanonicalName(),
+                    callExp);
+            return null;
+        }
+
+        return literalOnRight(callExp)
+                ? func.apply(colName, litType, literal)
+                : reverseFunc.apply(colName, litType, literal);
+    }
+
+    private static Predicate convertEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new Equals(colName, litType, literal);
+    }
+
+    private static Predicate convertNotEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new Not(convertEquals(colName, litType, literal));
+    }
+
+    private static Predicate convertGreaterThan(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new Not(new LessThanEquals(colName, litType, literal));
+    }
+
+    private static Predicate convertGreaterThanEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new Not(new LessThan(colName, litType, literal));
+    }
+
+    private static Predicate convertLessThan(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new LessThan(colName, litType, literal);
+    }
+
+    private static Predicate convertLessThanEquals(
+            String colName, PredicateLeaf.Type litType, Serializable literal) {
+        return new LessThanEquals(colName, litType, literal);
+    }
+
+    public static Predicate toOrcPredicate(Expression expression) {
+        if (expression instanceof CallExpression) {
+            CallExpression callExp = (CallExpression) expression;
+            if (FILTERS.get(callExp.getFunctionDefinition()) == null) {
+                // unsupported predicate
+                LOG.debug(
+                        "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+                        expression);
+                return null;
+            }
+            return FILTERS.get(callExp.getFunctionDefinition()).apply(callExp);
+        } else {
+            // unsupported predicate
+            LOG.debug(
+                    "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+                    expression);
+            return null;
+        }
+    }
+
+    private static String getColumnName(CallExpression comp) {
+        if (literalOnRight(comp)) {
+            return ((FieldReferenceExpression) comp.getChildren().get(0)).getName();
+        } else {
+            return ((FieldReferenceExpression) comp.getChildren().get(1)).getName();
+        }
+    }
+
+    private static boolean literalOnRight(CallExpression comp) {
+        if (comp.getChildren().size() == 1
+                && comp.getChildren().get(0) instanceof FieldReferenceExpression) {
+            return true;
+        } else if (isLit(comp.getChildren().get(0)) && isRef(comp.getChildren().get(1))) {
+            return false;
+        } else if (isRef(comp.getChildren().get(0)) && isLit(comp.getChildren().get(1))) {
+            return true;
+        } else {
+            throw new RuntimeException("Invalid binary comparison.");
+        }
+    }
+
+    private static PredicateLeaf.Type getLiteralType(CallExpression comp) {
+        if (literalOnRight(comp)) {
+            return toOrcType(
+                    ((ValueLiteralExpression) comp.getChildren().get(1)).getOutputDataType());
+        } else {
+            return toOrcType(
+                    ((ValueLiteralExpression) comp.getChildren().get(0)).getOutputDataType());
+        }
+    }
+
+    private static Object toOrcObject(PredicateLeaf.Type litType, Object literalObj) {
+        switch (litType) {
+            case DATE:
+                if (literalObj instanceof LocalDate) {
+                    LocalDate localDate = (LocalDate) literalObj;
+                    return Date.valueOf(localDate);
+                } else {
+                    return literalObj;
+                }
+            case TIMESTAMP:
+                if (literalObj instanceof LocalDateTime) {
+                    LocalDateTime localDateTime = (LocalDateTime) literalObj;
+                    return Timestamp.valueOf(localDateTime);
+                } else {
+                    return literalObj;
+                }
+            default:
+                return literalObj;
+        }
+    }
+
+    private static Optional<?> getLiteral(CallExpression comp) {
+        if (literalOnRight(comp)) {
+            ValueLiteralExpression valueLiteralExpression =
+                    (ValueLiteralExpression) comp.getChildren().get(1);
+            return valueLiteralExpression.getValueAs(
+                    valueLiteralExpression.getOutputDataType().getConversionClass());
+        } else {
+            ValueLiteralExpression valueLiteralExpression =
+                    (ValueLiteralExpression) comp.getChildren().get(0);
+            return valueLiteralExpression.getValueAs(
+                    valueLiteralExpression.getOutputDataType().getConversionClass());
+        }
+    }
+
+    private static PredicateLeaf.Type toOrcType(DataType type) {
+        LogicalTypeRoot ltype = type.getLogicalType().getTypeRoot();
+        switch (ltype) {
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return PredicateLeaf.Type.LONG;
+            case FLOAT:
+            case DOUBLE:
+                return PredicateLeaf.Type.FLOAT;
+            case BOOLEAN:
+                return PredicateLeaf.Type.BOOLEAN;
+            case CHAR:
+            case VARCHAR:
+                return PredicateLeaf.Type.STRING;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return PredicateLeaf.Type.TIMESTAMP;
+            case DATE:
+                return PredicateLeaf.Type.DATE;
+            case DECIMAL:
+                return PredicateLeaf.Type.DECIMAL;
+            default:
+                return null;
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    //  Classes to define predicates
+    // --------------------------------------------------------------------------------------------
+
+    /** A filter predicate that can be evaluated by the OrcInputFormat. */
+    public abstract static class Predicate implements Serializable {
+        public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
+    }
+
+    abstract static class ColumnPredicate extends Predicate {
+        final String columnName;
+        final PredicateLeaf.Type literalType;
+
+        ColumnPredicate(String columnName, PredicateLeaf.Type literalType) {
+            this.columnName = columnName;
+            this.literalType = literalType;
+        }
+
+        Object castLiteral(Serializable literal) {
+
+            switch (literalType) {
+                case LONG:
+                    if (literal instanceof Byte) {
+                        return new Long((Byte) literal);
+                    } else if (literal instanceof Short) {
+                        return new Long((Short) literal);
+                    } else if (literal instanceof Integer) {
+                        return new Long((Integer) literal);
+                    } else if (literal instanceof Long) {
+                        return literal;
+                    } else {
+                        throw new IllegalArgumentException(
+                                "A predicate on a LONG column requires an integer "
+                                        + "literal, i.e., Byte, Short, Integer, or Long.");
+                    }
+                case FLOAT:
+                    if (literal instanceof Float) {
+                        return new Double((Float) literal);
+                    } else if (literal instanceof Double) {
+                        return literal;
+                    } else if (literal instanceof BigDecimal) {
+                        return ((BigDecimal) literal).doubleValue();
+                    } else {
+                        throw new IllegalArgumentException(
+                                "A predicate on a FLOAT column requires a floating "
+                                        + "literal, i.e., Float or Double.");
+                    }
+                case STRING:
+                    if (literal instanceof String) {
+                        return literal;
+                    } else {
+                        throw new IllegalArgumentException(
+                                "A predicate on a STRING column requires a floating "
+                                        + "literal, i.e., Float or Double.");
+                    }
+                case BOOLEAN:
+                    if (literal instanceof Boolean) {
+                        return literal;
+                    } else {
+                        throw new IllegalArgumentException(
+                                "A predicate on a BOOLEAN column requires a Boolean literal.");
+                    }
+                case DATE:
+                    if (literal instanceof Date) {
+                        return literal;
+                    } else {
+                        throw new IllegalArgumentException(
+                                "A predicate on a DATE column requires a java.sql.Date literal.");
+                    }
+                case TIMESTAMP:
+                    if (literal instanceof Timestamp) {
+                        return literal;
+                    } else {
+                        throw new IllegalArgumentException(
+                                "A predicate on a TIMESTAMP column requires a java.sql.Timestamp literal.");
+                    }
+                case DECIMAL:
+                    if (literal instanceof BigDecimal) {
+                        return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) literal));
+                    } else {
+                        throw new IllegalArgumentException(
+                                "A predicate on a DECIMAL column requires a BigDecimal literal.");
+                    }
+                default:
+                    throw new IllegalArgumentException("Unknown literal type " + literalType);
+            }
+        }
+    }
+
+    abstract static class BinaryPredicate extends ColumnPredicate {
+        final Serializable literal;
+
+        BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+            super(columnName, literalType);
+            this.literal = literal;
+        }
+    }
+
+    /** An EQUALS predicate that can be evaluated by the OrcInputFormat. */
+    public static class Equals extends BinaryPredicate {
+        /**
+         * Creates an EQUALS predicate.
+         *
+         * @param columnName The column to check.
+         * @param literalType The type of the literal.
+         * @param literal The literal value to check the column against.
+         */
+        public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+            super(columnName, literalType, literal);
+        }
+
+        @Override
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            return builder.equals(columnName, literalType, castLiteral(literal));
+        }
+
+        @Override
+        public String toString() {
+            return columnName + " = " + literal;
+        }
+    }
+
+    /** A LESS_THAN predicate that can be evaluated by the OrcInputFormat. */
+    public static class LessThan extends BinaryPredicate {
+        /**
+         * Creates a LESS_THAN predicate.
+         *
+         * @param columnName The column to check.
+         * @param literalType The type of the literal.
+         * @param literal The literal value to check the column against.
+         */
+        public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+            super(columnName, literalType, literal);
+        }
+
+        @Override
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            return builder.lessThan(columnName, literalType, castLiteral(literal));
+        }
+
+        @Override
+        public String toString() {
+            return columnName + " < " + literal;
+        }
+    }
+
+    /** A LESS_THAN_EQUALS predicate that can be evaluated by the OrcInputFormat. */
+    public static class LessThanEquals extends BinaryPredicate {
+        /**
+         * Creates a LESS_THAN_EQUALS predicate.
+         *
+         * @param columnName The column to check.
+         * @param literalType The type of the literal.
+         * @param literal The literal value to check the column against.
+         */
+        public LessThanEquals(
+                String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+            super(columnName, literalType, literal);
+        }
+
+        @Override
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            return builder.lessThanEquals(columnName, literalType, castLiteral(literal));
+        }
+
+        @Override
+        public String toString() {
+            return columnName + " <= " + literal;
+        }
+    }
+
+    /** An IS_NULL predicate that can be evaluated by the OrcInputFormat. */
+    public static class IsNull extends ColumnPredicate {
+        /**
+         * Creates an IS_NULL predicate.
+         *
+         * @param columnName The column to check for null.
+         * @param literalType The type of the column to check for null.
+         */
+        public IsNull(String columnName, PredicateLeaf.Type literalType) {
+            super(columnName, literalType);
+        }
+
+        @Override
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            return builder.isNull(columnName, literalType);
+        }
+
+        @Override
+        public String toString() {
+            return columnName + " IS NULL";
+        }
+    }
+
+    /** An BETWEEN predicate that can be evaluated by the OrcInputFormat. */
+    public static class Between extends ColumnPredicate {
+        private final Serializable lowerBound;
+        private final Serializable upperBound;
+
+        /**
+         * Creates an BETWEEN predicate.
+         *
+         * @param columnName The column to check.
+         * @param literalType The type of the literals.
+         * @param lowerBound The literal value of the (inclusive) lower bound to check the column
+         *     against.
+         * @param upperBound The literal value of the (inclusive) upper bound to check the column
+         *     against.
+         */
+        public Between(
+                String columnName,
+                PredicateLeaf.Type literalType,
+                Serializable lowerBound,
+                Serializable upperBound) {
+            super(columnName, literalType);
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+        }
+
+        @Override
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            return builder.between(
+                    columnName, literalType, castLiteral(lowerBound), castLiteral(upperBound));
+        }
+
+        @Override
+        public String toString() {
+            return lowerBound + " <= " + columnName + " <= " + upperBound;
+        }
+    }
+
+    /** An IN predicate that can be evaluated by the OrcInputFormat. */
+    public static class In extends ColumnPredicate {
+        private final Serializable[] literals;
+
+        /**
+         * Creates an IN predicate.
+         *
+         * @param columnName The column to check.
+         * @param literalType The type of the literals.
+         * @param literals The literal values to check the column against.
+         */
+        public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals) {
+            super(columnName, literalType);
+            this.literals = literals;
+        }
+
+        @Override
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            Object[] castedLiterals = new Object[literals.length];
+            for (int i = 0; i < literals.length; i++) {
+                castedLiterals[i] = castLiteral(literals[i]);
+            }
+            return builder.in(columnName, literalType, castedLiterals);
+        }
+
+        @Override
+        public String toString() {
+            return columnName + " IN " + Arrays.toString(literals);
+        }
+    }
+
+    /** A NOT predicate to negate a predicate that can be evaluated by the OrcInputFormat. */
+    public static class Not extends Predicate {
+        private final Predicate pred;
+
+        /**
+         * Creates a NOT predicate.
+         *
+         * @param predicate The predicate to negate.
+         */
+        public Not(Predicate predicate) {
+            this.pred = predicate;
+        }
+
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            return pred.add(builder.startNot()).end();
+        }
+
+        @Override
+        public String toString() {
+            return "NOT(" + pred.toString() + ")";
+        }
+    }
+
+    /** An OR predicate that can be evaluated by the OrcInputFormat. */
+    public static class Or extends Predicate {
+        private final Predicate[] preds;
+
+        /**
+         * Creates an OR predicate.
+         *
+         * @param predicates The disjunctive predicates.
+         */
+        public Or(Predicate... predicates) {
+            this.preds = predicates;
+        }
+
+        @Override
+        public SearchArgument.Builder add(SearchArgument.Builder builder) {
+            SearchArgument.Builder withOr = builder.startOr();
+            for (Predicate p : preds) {
+                withOr = p.add(withOr);
+            }
+            return withOr.end();
+        }
+
+        @Override
+        public String toString() {
+            return "OR(" + Arrays.toString(preds) + ")";
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcPredicateFunctionVisitor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
similarity index 98%
rename from flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcPredicateFunctionVisitor.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
index 8e93a03c..ee29d5e2 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcPredicateFunctionVisitor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.format.orc;
+package org.apache.flink.table.store.format.orc.filter;
 
-import org.apache.flink.orc.OrcFilters;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.store.file.predicate.FieldRef;
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/AbstractOrcColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/AbstractOrcColumnVector.java
new file mode 100644
index 00000000..61e771be
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/AbstractOrcColumnVector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+
+/** This column vector is used to adapt hive's ColumnVector to Flink's ColumnVector. */
+public abstract class AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.ColumnVector {
+
+    private final ColumnVector vector;
+
+    AbstractOrcColumnVector(ColumnVector vector) {
+        this.vector = vector;
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i];
+    }
+
+    public static org.apache.flink.table.store.data.columnar.ColumnVector createFlinkVector(
+            ColumnVector vector, LogicalType logicalType) {
+        if (vector instanceof LongColumnVector) {
+            if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
+                return new OrcLegacyTimestampColumnVector((LongColumnVector) vector);
+            } else {
+                return new OrcLongColumnVector((LongColumnVector) vector);
+            }
+        } else if (vector instanceof DoubleColumnVector) {
+            return new OrcDoubleColumnVector((DoubleColumnVector) vector);
+        } else if (vector instanceof BytesColumnVector) {
+            return new OrcBytesColumnVector((BytesColumnVector) vector);
+        } else if (vector instanceof DecimalColumnVector) {
+            return new OrcDecimalColumnVector((DecimalColumnVector) vector);
+        } else if (vector instanceof TimestampColumnVector) {
+            return new OrcTimestampColumnVector(vector);
+        } else if (vector instanceof ListColumnVector) {
+            return new OrcArrayColumnVector((ListColumnVector) vector, (ArrayType) logicalType);
+        } else if (vector instanceof StructColumnVector) {
+            return new OrcRowColumnVector((StructColumnVector) vector, (RowType) logicalType);
+        } else if (vector instanceof MapColumnVector) {
+            return new OrcMapColumnVector((MapColumnVector) vector, (MapType) logicalType);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported vector: " + vector.getClass().getName());
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcArrayColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcArrayColumnVector.java
new file mode 100644
index 00000000..dacc8677
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcArrayColumnVector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+
+/** This column vector is used to adapt hive's ListColumnVector to Flink's ArrayColumnVector. */
+public class OrcArrayColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.ArrayColumnVector {
+
+    private final ListColumnVector hiveVector;
+    private final ColumnVector flinkVector;
+
+    public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) {
+        super(hiveVector);
+        this.hiveVector = hiveVector;
+        this.flinkVector = createFlinkVector(hiveVector.child, type.getElementType());
+    }
+
+    @Override
+    public ArrayData getArray(int i) {
+        long offset = hiveVector.offsets[i];
+        long length = hiveVector.lengths[i];
+        return new ColumnarArrayData(flinkVector, (int) offset, (int) length);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcBytesColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcBytesColumnVector.java
new file mode 100644
index 00000000..a2ef3ecf
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcBytesColumnVector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+
+/** This column vector is used to adapt hive's BytesColumnVector to Flink's BytesColumnVector. */
+public class OrcBytesColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.BytesColumnVector {
+
+    private final BytesColumnVector vector;
+
+    public OrcBytesColumnVector(BytesColumnVector vector) {
+        super(vector);
+        this.vector = vector;
+    }
+
+    @Override
+    public Bytes getBytes(int i) {
+        int rowId = vector.isRepeating ? 0 : i;
+        byte[][] data = vector.vector;
+        int[] start = vector.start;
+        int[] length = vector.length;
+        return new Bytes(data[rowId], start[rowId], length[rowId]);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDecimalColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDecimalColumnVector.java
new file mode 100644
index 00000000..4c8e539d
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDecimalColumnVector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.DecimalData;
+
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+
+import java.math.BigDecimal;
+
+/**
+ * This column vector is used to adapt hive's DecimalColumnVector to Flink's DecimalColumnVector.
+ */
+public class OrcDecimalColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.DecimalColumnVector {
+
+    private final DecimalColumnVector vector;
+
+    public OrcDecimalColumnVector(DecimalColumnVector vector) {
+        super(vector);
+        this.vector = vector;
+    }
+
+    @Override
+    public DecimalData getDecimal(int i, int precision, int scale) {
+        BigDecimal data =
+                vector.vector[vector.isRepeating ? 0 : i].getHiveDecimal().bigDecimalValue();
+        return DecimalData.fromBigDecimal(data, precision, scale);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDoubleColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDoubleColumnVector.java
new file mode 100644
index 00000000..69c335b6
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDoubleColumnVector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+
+/**
+ * This column vector is used to adapt hive's DoubleColumnVector to Flink's float and double
+ * ColumnVector.
+ */
+public class OrcDoubleColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.DoubleColumnVector,
+                org.apache.flink.table.store.data.columnar.FloatColumnVector {
+
+    private final DoubleColumnVector vector;
+
+    public OrcDoubleColumnVector(DoubleColumnVector vector) {
+        super(vector);
+        this.vector = vector;
+    }
+
+    @Override
+    public double getDouble(int i) {
+        return vector.vector[vector.isRepeating ? 0 : i];
+    }
+
+    @Override
+    public float getFloat(int i) {
+        return (float) vector.vector[vector.isRepeating ? 0 : i];
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLegacyTimestampColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLegacyTimestampColumnVector.java
new file mode 100644
index 00000000..1b677d2e
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLegacyTimestampColumnVector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+/**
+ * This class is used to adapt to Hive's legacy (2.0.x) timestamp column vector which is a
+ * LongColumnVector.
+ */
+public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.TimestampColumnVector {
+
+    private final LongColumnVector hiveVector;
+
+    OrcLegacyTimestampColumnVector(LongColumnVector vector) {
+        super(vector);
+        this.hiveVector = vector;
+    }
+
+    @Override
+    public TimestampData getTimestamp(int i, int precision) {
+        int index = hiveVector.isRepeating ? 0 : i;
+        Timestamp timestamp = toTimestamp(hiveVector.vector[index]);
+        return TimestampData.fromTimestamp(timestamp);
+    }
+
+    // creates a Hive ColumnVector of constant timestamp value
+    public static ColumnVector createFromConstant(int batchSize, Object value) {
+        LongColumnVector res = new LongColumnVector(batchSize);
+        if (value == null) {
+            res.noNulls = false;
+            res.isNull[0] = true;
+            res.isRepeating = true;
+        } else {
+            Timestamp timestamp =
+                    value instanceof LocalDateTime
+                            ? Timestamp.valueOf((LocalDateTime) value)
+                            : (Timestamp) value;
+            res.fill(fromTimestamp(timestamp));
+            res.isNull[0] = false;
+        }
+        return res;
+    }
+
+    // converting from/to Timestamp is copied from Hive 2.0.0 TimestampUtils
+    private static long fromTimestamp(Timestamp timestamp) {
+        long time = timestamp.getTime();
+        int nanos = timestamp.getNanos();
+        return (time * 1000000) + (nanos % 1000000);
+    }
+
+    private static Timestamp toTimestamp(long timeInNanoSec) {
+        long integralSecInMillis =
+                (timeInNanoSec / 1000000000) * 1000; // Full seconds converted to millis.
+        long nanos = timeInNanoSec % 1000000000; // The nanoseconds.
+        if (nanos < 0) {
+            nanos =
+                    1000000000
+                            + nanos; // The positive nano-part that will be added to milliseconds.
+            integralSecInMillis =
+                    ((timeInNanoSec / 1000000000) - 1) * 1000; // Reduce by one second.
+        }
+        Timestamp res = new Timestamp(0);
+        res.setTime(integralSecInMillis);
+        res.setNanos((int) nanos);
+        return res;
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLongColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLongColumnVector.java
new file mode 100644
index 00000000..27b9662f
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLongColumnVector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * This column vector is used to adapt hive's LongColumnVector to Flink's boolean, byte, short, int
+ * and long ColumnVector.
+ */
+public class OrcLongColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.LongColumnVector,
+                org.apache.flink.table.store.data.columnar.BooleanColumnVector,
+                org.apache.flink.table.store.data.columnar.ByteColumnVector,
+                org.apache.flink.table.store.data.columnar.ShortColumnVector,
+                org.apache.flink.table.store.data.columnar.IntColumnVector {
+
+    private final LongColumnVector vector;
+
+    public OrcLongColumnVector(LongColumnVector vector) {
+        super(vector);
+        this.vector = vector;
+    }
+
+    @Override
+    public long getLong(int i) {
+        return vector.vector[vector.isRepeating ? 0 : i];
+    }
+
+    @Override
+    public boolean getBoolean(int i) {
+        return vector.vector[vector.isRepeating ? 0 : i] == 1;
+    }
+
+    @Override
+    public byte getByte(int i) {
+        return (byte) vector.vector[vector.isRepeating ? 0 : i];
+    }
+
+    @Override
+    public int getInt(int i) {
+        return (int) vector.vector[vector.isRepeating ? 0 : i];
+    }
+
+    @Override
+    public short getShort(int i) {
+        return (short) vector.vector[vector.isRepeating ? 0 : i];
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcMapColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcMapColumnVector.java
new file mode 100644
index 00000000..a62fe9ce
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcMapColumnVector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarMapData;
+import org.apache.flink.table.types.logical.MapType;
+
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+
+/** This column vector is used to adapt hive's MapColumnVector to Flink's MapColumnVector. */
+public class OrcMapColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.MapColumnVector {
+
+    private final MapColumnVector hiveVector;
+    private final ColumnVector keyFlinkVector;
+    private final ColumnVector valueFlinkVector;
+
+    public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) {
+        super(hiveVector);
+        this.hiveVector = hiveVector;
+        this.keyFlinkVector = createFlinkVector(hiveVector.keys, type.getKeyType());
+        this.valueFlinkVector = createFlinkVector(hiveVector.values, type.getValueType());
+    }
+
+    @Override
+    public MapData getMap(int i) {
+        long offset = hiveVector.offsets[i];
+        long length = hiveVector.lengths[i];
+        return new ColumnarMapData(keyFlinkVector, valueFlinkVector, (int) offset, (int) length);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcRowColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcRowColumnVector.java
new file mode 100644
index 00000000..12c55e57
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcRowColumnVector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarRowData;
+import org.apache.flink.table.store.data.columnar.VectorizedColumnBatch;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+
+/** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */
+public class OrcRowColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.RowColumnVector {
+
+    private final ColumnarRowData columnarRowData;
+
+    public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
+        super(hiveVector);
+        int len = hiveVector.fields.length;
+        ColumnVector[] flinkVectors = new ColumnVector[len];
+        for (int i = 0; i < len; i++) {
+            flinkVectors[i] = createFlinkVector(hiveVector.fields[i], type.getTypeAt(i));
+        }
+        this.columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(flinkVectors));
+    }
+
+    @Override
+    public ColumnarRowData getRow(int i) {
+        this.columnarRowData.setRowId(i);
+        return this.columnarRowData;
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcSplitReaderUtil.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcSplitReaderUtil.java
new file mode 100644
index 00000000..a06388a5
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcSplitReaderUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.orc.TypeDescription;
+
+/** Util for orc types. */
+public class OrcSplitReaderUtil {
+
+    /** See {@code org.apache.flink.table.catalog.hive.util.HiveTypeUtil}. */
+    public static TypeDescription logicalTypeToOrcType(LogicalType type) {
+        type = type.copy(true);
+        switch (type.getTypeRoot()) {
+            case CHAR:
+                return TypeDescription.createChar().withMaxLength(((CharType) type).getLength());
+            case VARCHAR:
+                int len = ((VarCharType) type).getLength();
+                if (len == VarCharType.MAX_LENGTH) {
+                    return TypeDescription.createString();
+                } else {
+                    return TypeDescription.createVarchar().withMaxLength(len);
+                }
+            case BOOLEAN:
+                return TypeDescription.createBoolean();
+            case VARBINARY:
+                if (type.equals(DataTypes.BYTES().getLogicalType())) {
+                    return TypeDescription.createBinary();
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Not support other binary type: " + type);
+                }
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) type;
+                return TypeDescription.createDecimal()
+                        .withScale(decimalType.getScale())
+                        .withPrecision(decimalType.getPrecision());
+            case TINYINT:
+                return TypeDescription.createByte();
+            case SMALLINT:
+                return TypeDescription.createShort();
+            case INTEGER:
+                return TypeDescription.createInt();
+            case BIGINT:
+                return TypeDescription.createLong();
+            case FLOAT:
+                return TypeDescription.createFloat();
+            case DOUBLE:
+                return TypeDescription.createDouble();
+            case DATE:
+                return TypeDescription.createDate();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return TypeDescription.createTimestamp();
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) type;
+                return TypeDescription.createList(logicalTypeToOrcType(arrayType.getElementType()));
+            case MAP:
+                MapType mapType = (MapType) type;
+                return TypeDescription.createMap(
+                        logicalTypeToOrcType(mapType.getKeyType()),
+                        logicalTypeToOrcType(mapType.getValueType()));
+            case ROW:
+                RowType rowType = (RowType) type;
+                TypeDescription struct = TypeDescription.createStruct();
+                for (int i = 0; i < rowType.getFieldCount(); i++) {
+                    struct.addField(
+                            rowType.getFieldNames().get(i),
+                            logicalTypeToOrcType(rowType.getChildren().get(i)));
+                }
+                return struct;
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcTimestampColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcTimestampColumnVector.java
new file mode 100644
index 00000000..0dbba966
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcTimestampColumnVector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+
+import java.sql.Timestamp;
+
+/**
+ * This column vector is used to adapt hive's TimestampColumnVector to Flink's
+ * TimestampColumnVector.
+ */
+public class OrcTimestampColumnVector extends AbstractOrcColumnVector
+        implements org.apache.flink.table.store.data.columnar.TimestampColumnVector {
+
+    private final TimestampColumnVector vector;
+
+    public OrcTimestampColumnVector(ColumnVector vector) {
+        super(vector);
+        this.vector = (TimestampColumnVector) vector;
+    }
+
+    @Override
+    public TimestampData getTimestamp(int i, int precision) {
+        int index = vector.isRepeating ? 0 : i;
+        Timestamp timestamp = new Timestamp(vector.time[index]);
+        timestamp.setNanos(vector.nanos[index]);
+        return TimestampData.fromTimestamp(timestamp);
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/OrcBulkWriter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/OrcBulkWriter.java
new file mode 100644
index 00000000..e3a80a3b
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/OrcBulkWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that writes data in ORC format.
+ *
+ * @param <T> The type of element written.
+ */
+@Internal
+public class OrcBulkWriter<T> implements BulkWriter<T> {
+
+    private final Writer writer;
+    private final Vectorizer<T> vectorizer;
+    private final VectorizedRowBatch rowBatch;
+
+    public OrcBulkWriter(Vectorizer<T> vectorizer, Writer writer) {
+        this.vectorizer = checkNotNull(vectorizer);
+        this.writer = checkNotNull(writer);
+        this.rowBatch = vectorizer.getSchema().createRowBatch();
+
+        // Configure the vectorizer with the writer so that users can add
+        // metadata on the fly through the Vectorizer#vectorize(...) method.
+        this.vectorizer.setWriter(this.writer);
+    }
+
+    @Override
+    public void addElement(T element) throws IOException {
+        vectorizer.vectorize(element, rowBatch);
+        if (rowBatch.size == rowBatch.getMaxSize()) {
+            writer.addRowBatch(rowBatch);
+            rowBatch.reset();
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (rowBatch.size != 0) {
+            writer.addRowBatch(rowBatch);
+            rowBatch.reset();
+        }
+    }
+
+    @Override
+    public void finish() throws IOException {
+        flush();
+        writer.close();
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/PhysicalWriterImpl.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/PhysicalWriterImpl.java
new file mode 100644
index 00000000..03608ced
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/PhysicalWriterImpl.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.HadoopShims;
+import org.apache.orc.impl.OrcCodecPool;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.StreamName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
+
+/**
+ * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
+ *
+ * <p>Whereas PhysicalFsWriter implementation works on the basis of a Path, this implementation
+ * leverages Flink's {@link FSDataOutputStream} to write the compressed data.
+ *
+ * <p>NOTE: If the ORC dependency version is updated, this file may have to be updated as well to be
+ * in sync with the new version's PhysicalFsWriter.
+ */
+@Internal
+public class PhysicalWriterImpl implements PhysicalWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class);
+    private static final byte[] ZEROS = new byte[64 * 1024];
+    private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+    protected final OutStream writer;
+    private final CodedOutputStream protobufWriter;
+    private final CompressionKind compress;
+    private final Map<StreamName, BufferedStream> streams;
+    private final HadoopShims shims;
+    private final int maxPadding;
+    private final int bufferSize;
+    private final long blockSize;
+    private final boolean addBlockPadding;
+    private final boolean writeVariableLengthBlocks;
+
+    private CompressionCodec codec;
+    private FSDataOutputStream out;
+    private long headerLength;
+    private long stripeStart;
+    private long blockOffset;
+    private int metadataLength;
+    private int footerLength;
+
+    public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts)
+            throws IOException {
+        if (opts.isEnforceBufferSize()) {
+            this.bufferSize = opts.getBufferSize();
+        } else {
+            this.bufferSize =
+                    getEstimatedBufferSize(
+                            opts.getStripeSize(),
+                            opts.getSchema().getMaximumId() + 1,
+                            opts.getBufferSize());
+        }
+
+        this.out = out;
+        this.blockOffset = 0;
+        this.blockSize = opts.getBlockSize();
+        this.maxPadding = (int) (opts.getPaddingTolerance() * (double) opts.getBufferSize());
+        this.compress = opts.getCompress();
+        this.codec = OrcCodecPool.getCodec(this.compress);
+        this.streams = new TreeMap<>();
+        this.writer =
+                new OutStream("metadata", this.bufferSize, this.codec, new DirectStream(this.out));
+        this.shims = opts.getHadoopShims();
+        this.addBlockPadding = opts.getBlockPadding();
+        this.protobufWriter = CodedOutputStream.newInstance(this.writer);
+        this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
+    }
+
+    @Override
+    public void writeHeader() throws IOException {
+        this.out.write("ORC".getBytes());
+        this.headerLength = this.out.getPos();
+    }
+
+    @Override
+    public OutputReceiver createDataStream(StreamName name) throws IOException {
+        BufferedStream result = streams.get(name);
+
+        if (result == null) {
+            result = new BufferedStream();
+            streams.put(name, result);
+        }
+
+        return result;
+    }
+
+    @Override
+    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec)
+            throws IOException {
+        OutputStream stream =
+                new OutStream(this.toString(), bufferSize, codec, createDataStream(name));
+        index.build().writeTo(stream);
+        stream.flush();
+    }
+
+    @Override
+    public void writeBloomFilter(
+            StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec)
+            throws IOException {
+        OutputStream stream =
+                new OutStream(this.toString(), bufferSize, codec, createDataStream(name));
+        bloom.build().writeTo(stream);
+        stream.flush();
+    }
+
+    @Override
+    public void finalizeStripe(
+            OrcProto.StripeFooter.Builder footerBuilder,
+            OrcProto.StripeInformation.Builder dirEntry)
+            throws IOException {
+        long indexSize = 0;
+        long dataSize = 0;
+
+        for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+            BufferedStream receiver = pair.getValue();
+            if (!receiver.isSuppressed) {
+                long streamSize = receiver.getOutputSize();
+                StreamName name = pair.getKey();
+                footerBuilder.addStreams(
+                        OrcProto.Stream.newBuilder()
+                                .setColumn(name.getColumn())
+                                .setKind(name.getKind())
+                                .setLength(streamSize));
+                if (StreamName.Area.INDEX == name.getArea()) {
+                    indexSize += streamSize;
+                } else {
+                    dataSize += streamSize;
+                }
+            }
+        }
+
+        dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+        OrcProto.StripeFooter footer = footerBuilder.build();
+        // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+        padStripe(indexSize + dataSize + footer.getSerializedSize());
+
+        // write out the data streams
+        for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+            pair.getValue().spillToDiskAndClear(out);
+        }
+
+        // Write out the footer.
+        writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+    }
+
+    @Override
+    public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+        long startPosition = out.getPos();
+        OrcProto.Metadata metadata = builder.build();
+        writeMetadata(metadata);
+        this.metadataLength = (int) (out.getPos() - startPosition);
+    }
+
+    @Override
+    public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+        long bodyLength = out.getPos() - metadataLength;
+        builder.setContentLength(bodyLength);
+        builder.setHeaderLength(headerLength);
+        long startPosition = out.getPos();
+        OrcProto.Footer footer = builder.build();
+        writeFileFooter(footer);
+        this.footerLength = (int) (out.getPos() - startPosition);
+    }
+
+    @Override
+    public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+        builder.setFooterLength(footerLength);
+        builder.setMetadataLength(metadataLength);
+
+        OrcProto.PostScript ps = builder.build();
+        // need to write this uncompressed
+        long startPosition = out.getPos();
+        ps.writeTo(out);
+        long length = out.getPos() - startPosition;
+
+        if (length > 255) {
+            throw new IllegalArgumentException("PostScript too large at " + length);
+        }
+
+        out.write((int) length);
+        return out.getPos();
+    }
+
+    @Override
+    public void close() {
+        // Just release the codec but don't close the internal stream here to avoid
+        // Stream Closed or ClosedChannelException when Flink performs checkpoint.
+        OrcCodecPool.returnCodec(compress, codec);
+        codec = null;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    @Override
+    public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry)
+            throws IOException {
+        long start = out.getPos();
+        int length = buffer.remaining();
+        long availBlockSpace = blockSize - (start % blockSize);
+
+        // see if stripe can fit in the current hdfs block, else pad the remaining
+        // space in the block
+        if (length < blockSize && length > availBlockSpace && addBlockPadding) {
+            byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+            LOG.info(String.format("Padding ORC by %d bytes while merging..", availBlockSpace));
+            start += availBlockSpace;
+            while (availBlockSpace > 0) {
+                int writeLen = (int) Math.min(availBlockSpace, pad.length);
+                out.write(pad, 0, writeLen);
+                availBlockSpace -= writeLen;
+            }
+        }
+
+        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length);
+        dirEntry.setOffset(start);
+    }
+
+    @Override
+    public CompressionCodec getCompressionCodec() {
+        return this.codec;
+    }
+
+    @Override
+    public long getFileBytes(int column) {
+        long size = 0;
+
+        for (final Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+            final BufferedStream receiver = pair.getValue();
+            if (!receiver.isSuppressed) {
+
+                final StreamName name = pair.getKey();
+                if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) {
+                    size += receiver.getOutputSize();
+                }
+            }
+        }
+
+        return size;
+    }
+
+    private void padStripe(long stripeSize) throws IOException {
+        this.stripeStart = out.getPos();
+        long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
+
+        // We only have options if this isn't the first stripe in the block
+        if (previousBytesInBlock > 0) {
+            if (previousBytesInBlock + stripeSize >= blockSize) {
+                // Try making a short block
+                if (writeVariableLengthBlocks && shims.endVariableLengthBlock(out)) {
+                    blockOffset = stripeStart;
+                } else if (addBlockPadding) {
+                    // if we cross the block boundary, figure out what we should do
+                    long padding = blockSize - previousBytesInBlock;
+                    if (padding <= maxPadding) {
+                        writeZeros(out, padding);
+                        stripeStart += padding;
+                    }
+                }
+            }
+        }
+    }
+
+    private void writeStripeFooter(
+            OrcProto.StripeFooter footer,
+            long dataSize,
+            long indexSize,
+            OrcProto.StripeInformation.Builder dirEntry)
+            throws IOException {
+        writeStripeFooter(footer);
+
+        dirEntry.setOffset(stripeStart);
+        dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - indexSize);
+    }
+
+    protected void writeMetadata(OrcProto.Metadata metadata) throws IOException {
+        metadata.writeTo(protobufWriter);
+        protobufWriter.flush();
+        writer.flush();
+    }
+
+    protected void writeFileFooter(OrcProto.Footer footer) throws IOException {
+        footer.writeTo(protobufWriter);
+        protobufWriter.flush();
+        writer.flush();
+    }
+
+    protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException {
+        footer.writeTo(protobufWriter);
+        protobufWriter.flush();
+        writer.flush();
+    }
+
+    private static void writeZeros(OutputStream output, long remaining) throws IOException {
+        while (remaining > 0) {
+            long size = Math.min(ZEROS.length, remaining);
+            output.write(ZEROS, 0, (int) size);
+            remaining -= size;
+        }
+    }
+
+    private static class DirectStream implements OutputReceiver {
+        private final FSDataOutputStream output;
+
+        DirectStream(FSDataOutputStream output) {
+            this.output = output;
+        }
+
+        public void output(ByteBuffer buffer) throws IOException {
+            this.output.write(
+                    buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+        }
+
+        public void suppress() {
+            throw new UnsupportedOperationException("Can't suppress direct stream");
+        }
+    }
+
+    private static final class BufferedStream implements OutputReceiver {
+        private boolean isSuppressed = false;
+        private final List<ByteBuffer> output = new ArrayList<>();
+
+        @Override
+        public void output(ByteBuffer buffer) {
+            if (!isSuppressed) {
+                output.add(buffer);
+            }
+        }
+
+        public void suppress() {
+            isSuppressed = true;
+            output.clear();
+        }
+
+        void spillToDiskAndClear(FSDataOutputStream raw) throws IOException {
+            if (!isSuppressed) {
+                for (ByteBuffer buffer : output) {
+                    raw.write(
+                            buffer.array(),
+                            buffer.arrayOffset() + buffer.position(),
+                            buffer.remaining());
+                }
+                output.clear();
+            }
+            isSuppressed = false;
+        }
+
+        public long getOutputSize() {
+            long result = 0;
+            for (ByteBuffer buffer : output) {
+                result += buffer.remaining();
+            }
+            return result;
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/RowDataVectorizer.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/RowDataVectorizer.java
new file mode 100644
index 00000000..6e3f32dc
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/RowDataVectorizer.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.sql.Timestamp;
+
+/** A {@link Vectorizer} of {@link RowData} type element. */
+public class RowDataVectorizer extends Vectorizer<RowData> {
+
+    private final LogicalType[] fieldTypes;
+
+    public RowDataVectorizer(String schema, LogicalType[] fieldTypes) {
+        super(schema);
+        this.fieldTypes = fieldTypes;
+    }
+
+    @Override
+    public void vectorize(RowData row, VectorizedRowBatch batch) {
+        int rowId = batch.size++;
+        for (int i = 0; i < row.getArity(); ++i) {
+            setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);
+        }
+    }
+
+    private static void setColumn(
+            int rowId, ColumnVector column, LogicalType type, RowData row, int columnId) {
+        if (row.isNullAt(columnId)) {
+            column.noNulls = false;
+            column.isNull[rowId] = true;
+            return;
+        }
+
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                {
+                    BytesColumnVector vector = (BytesColumnVector) column;
+                    byte[] bytes = row.getString(columnId).toBytes();
+                    vector.setVal(rowId, bytes, 0, bytes.length);
+                    break;
+                }
+            case BOOLEAN:
+                {
+                    LongColumnVector vector = (LongColumnVector) column;
+                    vector.vector[rowId] = row.getBoolean(columnId) ? 1 : 0;
+                    break;
+                }
+            case BINARY:
+            case VARBINARY:
+                {
+                    BytesColumnVector vector = (BytesColumnVector) column;
+                    byte[] bytes = row.getBinary(columnId);
+                    vector.setVal(rowId, bytes, 0, bytes.length);
+                    break;
+                }
+            case DECIMAL:
+                {
+                    DecimalType dt = (DecimalType) type;
+                    DecimalColumnVector vector = (DecimalColumnVector) column;
+                    vector.set(
+                            rowId,
+                            HiveDecimal.create(
+                                    row.getDecimal(columnId, dt.getPrecision(), dt.getScale())
+                                            .toBigDecimal()));
+                    break;
+                }
+            case TINYINT:
+                {
+                    LongColumnVector vector = (LongColumnVector) column;
+                    vector.vector[rowId] = row.getByte(columnId);
+                    break;
+                }
+            case SMALLINT:
+                {
+                    LongColumnVector vector = (LongColumnVector) column;
+                    vector.vector[rowId] = row.getShort(columnId);
+                    break;
+                }
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTEGER:
+                {
+                    LongColumnVector vector = (LongColumnVector) column;
+                    vector.vector[rowId] = row.getInt(columnId);
+                    break;
+                }
+            case BIGINT:
+                {
+                    LongColumnVector vector = (LongColumnVector) column;
+                    vector.vector[rowId] = row.getLong(columnId);
+                    break;
+                }
+            case FLOAT:
+                {
+                    DoubleColumnVector vector = (DoubleColumnVector) column;
+                    vector.vector[rowId] = row.getFloat(columnId);
+                    break;
+                }
+            case DOUBLE:
+                {
+                    DoubleColumnVector vector = (DoubleColumnVector) column;
+                    vector.vector[rowId] = row.getDouble(columnId);
+                    break;
+                }
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                {
+                    TimestampType tt = (TimestampType) type;
+                    Timestamp timestamp =
+                            row.getTimestamp(columnId, tt.getPrecision()).toTimestamp();
+                    TimestampColumnVector vector = (TimestampColumnVector) column;
+                    vector.set(rowId, timestamp);
+                    break;
+                }
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                {
+                    LocalZonedTimestampType lt = (LocalZonedTimestampType) type;
+                    Timestamp timestamp =
+                            row.getTimestamp(columnId, lt.getPrecision()).toTimestamp();
+                    TimestampColumnVector vector = (TimestampColumnVector) column;
+                    vector.set(rowId, timestamp);
+                    break;
+                }
+            case ARRAY:
+                {
+                    ListColumnVector listColumnVector = (ListColumnVector) column;
+                    setColumn(rowId, listColumnVector, type, row, columnId);
+                    break;
+                }
+            case MAP:
+                {
+                    MapColumnVector mapColumnVector = (MapColumnVector) column;
+                    setColumn(rowId, mapColumnVector, type, row, columnId);
+                    break;
+                }
+            case ROW:
+                {
+                    StructColumnVector structColumnVector = (StructColumnVector) column;
+                    setColumn(rowId, structColumnVector, type, row, columnId);
+                    break;
+                }
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private static void setColumn(
+            int rowId,
+            ListColumnVector listColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        ArrayData arrayData = row.getArray(columnId);
+        ArrayType arrayType = (ArrayType) type;
+        listColumnVector.lengths[rowId] = arrayData.size();
+        listColumnVector.offsets[rowId] = listColumnVector.childCount;
+        listColumnVector.childCount += listColumnVector.lengths[rowId];
+        listColumnVector.child.ensureSize(
+                listColumnVector.childCount, listColumnVector.offsets[rowId] != 0);
+
+        RowData convertedRowData = convert(arrayData, arrayType.getElementType());
+        for (int i = 0; i < arrayData.size(); i++) {
+            setColumn(
+                    (int) listColumnVector.offsets[rowId] + i,
+                    listColumnVector.child,
+                    arrayType.getElementType(),
+                    convertedRowData,
+                    i);
+        }
+    }
+
+    private static void setColumn(
+            int rowId,
+            MapColumnVector mapColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        MapData mapData = row.getMap(columnId);
+        MapType mapType = (MapType) type;
+        ArrayData keyArray = mapData.keyArray();
+        ArrayData valueArray = mapData.valueArray();
+        mapColumnVector.lengths[rowId] = mapData.size();
+        mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
+        mapColumnVector.childCount += mapColumnVector.lengths[rowId];
+        mapColumnVector.keys.ensureSize(
+                mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);
+        mapColumnVector.values.ensureSize(
+                mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);
+
+        RowData convertedKeyRowData = convert(keyArray, mapType.getKeyType());
+        RowData convertedValueRowData = convert(valueArray, mapType.getValueType());
+        for (int i = 0; i < keyArray.size(); i++) {
+            setColumn(
+                    (int) mapColumnVector.offsets[rowId] + i,
+                    mapColumnVector.keys,
+                    mapType.getKeyType(),
+                    convertedKeyRowData,
+                    i);
+            setColumn(
+                    (int) mapColumnVector.offsets[rowId] + i,
+                    mapColumnVector.values,
+                    mapType.getValueType(),
+                    convertedValueRowData,
+                    i);
+        }
+    }
+
+    private static void setColumn(
+            int rowId,
+            StructColumnVector structColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        RowData structRow = row.getRow(columnId, structColumnVector.fields.length);
+        RowType rowType = (RowType) type;
+        for (int i = 0; i < structRow.getArity(); i++) {
+            ColumnVector cv = structColumnVector.fields[i];
+            setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
+        }
+    }
+
+    /**
+     * Converting ArrayData to RowData for calling {@link RowDataVectorizer#setColumn(int,
+     * ColumnVector, LogicalType, RowData, int)} recursively with array.
+     *
+     * @param arrayData input ArrayData.
+     * @param arrayFieldType LogicalType of input ArrayData.
+     * @return RowData.
+     */
+    private static RowData convert(ArrayData arrayData, LogicalType arrayFieldType) {
+        GenericRowData rowData = new GenericRowData(arrayData.size());
+        ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(arrayFieldType);
+        for (int i = 0; i < arrayData.size(); i++) {
+            rowData.setField(i, elementGetter.getElementOrNull(arrayData, i));
+        }
+        return rowData;
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/Vectorizer.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/Vectorizer.java
new file mode 100644
index 00000000..dbddc16e
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/Vectorizer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class provides an abstracted set of methods to handle the lifecycle of {@link
+ * VectorizedRowBatch}.
+ *
+ * <p>Users have to extend this class and override the vectorize() method with the logic to
+ * transform the element to a {@link VectorizedRowBatch}.
+ *
+ * @param <T> The type of the element
+ */
+@PublicEvolving
+public abstract class Vectorizer<T> implements Serializable {
+
+    private final TypeDescription schema;
+
+    private transient Writer writer;
+
+    public Vectorizer(final String schema) {
+        checkNotNull(schema);
+        this.schema = TypeDescription.fromString(schema);
+    }
+
+    /**
+     * Provides the ORC schema.
+     *
+     * @return the ORC schema
+     */
+    public TypeDescription getSchema() {
+        return this.schema;
+    }
+
+    /**
+     * Users are not supposed to use this method since this is intended to be used only by the
+     * {@link OrcBulkWriter}.
+     *
+     * @param writer the underlying ORC Writer.
+     */
+    public void setWriter(Writer writer) {
+        this.writer = writer;
+    }
+
+    /**
+     * Adds arbitrary user metadata to the outgoing ORC file.
+     *
+     * <p>Users who want to dynamically add new metadata either based on either the input or from an
+     * external system can do so by calling <code>addUserMetadata(...)</code> inside the overridden
+     * vectorize() method.
+     *
+     * @param key a key to label the data with.
+     * @param value the contents of the metadata.
+     */
+    public void addUserMetadata(String key, ByteBuffer value) {
+        this.writer.addUserMetadata(key, value);
+    }
+
+    /**
+     * Transforms the provided element to ColumnVectors and sets them in the exposed
+     * VectorizedRowBatch.
+     *
+     * @param element The input element
+     * @param batch The batch to write the ColumnVectors
+     * @throws IOException if there is an error while transforming the input.
+     */
+    public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
+}
diff --git a/flink-table-store-format/src/main/resources/META-INF/NOTICE b/flink-table-store-format/src/main/resources/META-INF/NOTICE
index 4103b2d0..89edd908 100644
--- a/flink-table-store-format/src/main/resources/META-INF/NOTICE
+++ b/flink-table-store-format/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,12 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
+- org.apache.avro:avro:1.11.1
+- com.fasterxml.jackson.core:jackson-core:2.13.4
+- com.fasterxml.jackson.core:jackson-databind:2.13.4.2
+- com.fasterxml.jackson.core:jackson-annotations:2.13.4
+- org.apache.commons:commons-compress:1.21
+
 - org.apache.parquet:parquet-avro:1.12.3
 - org.apache.parquet:parquet-hadoop:1.12.3
 - org.apache.parquet:parquet-column:1.12.3
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTest.java
new file mode 100644
index 00000000..c6ef91de
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.format.avro.AvroBulkFormatTestUtils.ROW_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AbstractAvroBulkFormat}. */
+class AvroBulkFormatTest {
+
+    private static final List<RowData> TEST_DATA =
+            Arrays.asList(
+                    // -------- batch 0, block start 232 --------
+                    GenericRowData.of(
+                            StringData.fromString("AvroBulk"), StringData.fromString("FormatTest")),
+                    GenericRowData.of(
+                            StringData.fromString("Apache"), StringData.fromString("Flink")),
+                    GenericRowData.of(
+                            StringData.fromString(
+                                    "永和九年,岁在癸丑,暮春之初,会于会稽山阴之兰亭,修禊事也。群贤毕至,少"
+                                            + "长咸集。此地有崇山峻岭,茂林修竹,又有清流激湍,映带左右。引"
+                                            + "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅"
+                                            + "叙幽情。"),
+                            StringData.fromString("")),
+                    // -------- batch 1, block start 593 --------
+                    GenericRowData.of(
+                            StringData.fromString("File"), StringData.fromString("Format")),
+                    GenericRowData.of(
+                            null,
+                            StringData.fromString(
+                                    "This is a string with English, 中文 and even 🍎🍌🍑🥝🍍🥭🍐")),
+                    // -------- batch 2, block start 705 --------
+                    GenericRowData.of(
+                            StringData.fromString("block with"),
+                            StringData.fromString("only one record"))
+                    // -------- file length 752 --------
+                    );
+    private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L, 705L);
+
+    private File tmpFile;
+
+    @BeforeEach
+    public void before() throws IOException {
+        tmpFile = Files.createTempFile("avro-bulk-format-test", ".avro").toFile();
+        tmpFile.createNewFile();
+        FileOutputStream out = new FileOutputStream(tmpFile);
+
+        Schema schema = AvroSchemaConverter.convertToSchema(ROW_TYPE);
+        RowDataToAvroConverters.RowDataToAvroConverter converter =
+                RowDataToAvroConverters.createConverter(ROW_TYPE);
+
+        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
+        dataFileWriter.create(schema, out);
+
+        //  Generate the sync points manually in order to test blocks.
+        long syncBlock1 = dataFileWriter.sync();
+        dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(0)));
+        dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(1)));
+        dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(2)));
+        long syncBlock2 = dataFileWriter.sync();
+        dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(3)));
+        dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(4)));
+        long syncBlock3 = dataFileWriter.sync();
+        dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(5)));
+        long syncEnd = dataFileWriter.sync();
+        dataFileWriter.close();
+
+        // These values should be constant if nothing else changes with the file.
+        assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1, syncBlock2, syncBlock3));
+        assertThat(tmpFile).hasSize(syncEnd);
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        FileUtils.deleteFileOrDirectory(tmpFile);
+    }
+
+    @Test
+    void testReadWholeFileWithOneSplit() throws IOException {
+        AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+                new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+        assertSplit(
+                bulkFormat,
+                Collections.singletonList(
+                        new SplitInfo(
+                                0,
+                                tmpFile.length(),
+                                Arrays.asList(
+                                        new BatchInfo(0, 3),
+                                        new BatchInfo(3, 5),
+                                        new BatchInfo(5, 6)))));
+    }
+
+    @Test
+    void testReadWholeFileWithMultipleSplits() throws IOException {
+        AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+                new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+        long splitLength = tmpFile.length() / 3;
+        assertSplit(
+                bulkFormat,
+                Arrays.asList(
+                        new SplitInfo(
+                                0, splitLength, Collections.singletonList(new BatchInfo(0, 3))),
+                        new SplitInfo(splitLength, splitLength * 2, Collections.emptyList()),
+                        new SplitInfo(
+                                splitLength * 2,
+                                tmpFile.length(),
+                                Arrays.asList(new BatchInfo(3, 5), new BatchInfo(5, 6)))));
+    }
+
+    @Test
+    void testSplitsAtCriticalLocations() throws IOException {
+        AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+                new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+        assertSplit(
+                bulkFormat,
+                Arrays.asList(
+                        // ends just before the new block
+                        new SplitInfo(
+                                BLOCK_STARTS.get(0) - DataFileConstants.SYNC_SIZE,
+                                BLOCK_STARTS.get(1) - DataFileConstants.SYNC_SIZE,
+                                Collections.singletonList(new BatchInfo(0, 3))),
+                        // ends just at the beginning of new block
+                        new SplitInfo(
+                                BLOCK_STARTS.get(1) - DataFileConstants.SYNC_SIZE,
+                                BLOCK_STARTS.get(2) - DataFileConstants.SYNC_SIZE + 1,
+                                Arrays.asList(new BatchInfo(3, 5), new BatchInfo(5, 6)))));
+    }
+
+    @Test
+    void testRestoreReader() throws IOException {
+        AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+                new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+        long splitLength = tmpFile.length() / 3;
+        String splitId = UUID.randomUUID().toString();
+
+        FileSourceSplit split =
+                new FileSourceSplit(
+                        splitId, new Path(tmpFile.toString()), splitLength * 2, tmpFile.length());
+        BulkFormat.Reader<RowData> reader = bulkFormat.createReader(new Configuration(), split);
+        long offset1 = assertBatch(reader, new BatchInfo(3, 5));
+        assertBatch(reader, new BatchInfo(5, 6));
+        assertThat(reader.readBatch()).isNull();
+        reader.close();
+
+        split =
+                new FileSourceSplit(
+                        splitId,
+                        new Path(tmpFile.toString()),
+                        splitLength * 2,
+                        tmpFile.length(),
+                        StringUtils.EMPTY_STRING_ARRAY,
+                        new CheckpointedPosition(offset1, 1));
+        reader = bulkFormat.restoreReader(new Configuration(), split);
+        long offset2 = assertBatch(reader, new BatchInfo(3, 5), 1);
+        assertBatch(reader, new BatchInfo(5, 6));
+        assertThat(reader.readBatch()).isNull();
+        reader.close();
+
+        assertThat(offset2).isEqualTo(offset1);
+    }
+
+    private void assertSplit(
+            AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat, List<SplitInfo> splitInfos)
+            throws IOException {
+        for (SplitInfo splitInfo : splitInfos) {
+            FileSourceSplit split =
+                    new FileSourceSplit(
+                            UUID.randomUUID().toString(),
+                            new Path(tmpFile.toString()),
+                            splitInfo.start,
+                            splitInfo.end - splitInfo.start);
+            BulkFormat.Reader<RowData> reader = bulkFormat.createReader(new Configuration(), split);
+            List<Long> offsets = new ArrayList<>();
+            for (BatchInfo batch : splitInfo.batches) {
+                offsets.add(assertBatch(reader, batch));
+            }
+            assertThat(reader.readBatch()).isNull();
+            for (int j = 1; j < offsets.size(); j++) {
+                assertThat(offsets.get(j - 1) < offsets.get(j)).isTrue();
+            }
+            reader.close();
+        }
+    }
+
+    private long assertBatch(BulkFormat.Reader<RowData> reader, BatchInfo batchInfo)
+            throws IOException {
+        return assertBatch(reader, batchInfo, 0);
+    }
+
+    private long assertBatch(
+            BulkFormat.Reader<RowData> reader, BatchInfo batchInfo, int initialSkipCount)
+            throws IOException {
+        long ret = -1;
+        int skipCount = initialSkipCount;
+        BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
+        for (RecordAndPosition<RowData> recordAndPos = iterator.next();
+                recordAndPos != null;
+                recordAndPos = iterator.next()) {
+            if (ret == -1) {
+                ret = recordAndPos.getOffset();
+            }
+            assertThat(recordAndPos.getRecord())
+                    .isEqualTo(TEST_DATA.get(batchInfo.start + skipCount));
+            assertThat(recordAndPos.getOffset()).isEqualTo(ret);
+            skipCount++;
+            assertThat(recordAndPos.getRecordSkipCount()).isEqualTo(skipCount);
+        }
+        assertThat(skipCount).isEqualTo(batchInfo.end - batchInfo.start);
+        iterator.releaseBatch();
+        return ret;
+    }
+
+    private static class SplitInfo {
+        private final long start;
+        private final long end;
+        private final List<BatchInfo> batches;
+
+        private SplitInfo(long start, long end, List<BatchInfo> batches) {
+            this.start = start;
+            this.end = end;
+            this.batches = batches;
+        }
+    }
+
+    private static class BatchInfo {
+        private final int start;
+        private final int end;
+
+        private BatchInfo(int start, int end) {
+            this.start = start;
+            this.end = end;
+        }
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTestUtils.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTestUtils.java
new file mode 100644
index 00000000..57279a11
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTestUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.function.Function;
+
+/** Testing utils for tests related to {@link AbstractAvroBulkFormat}. */
+public class AvroBulkFormatTestUtils {
+
+    public static final RowType ROW_TYPE =
+            RowType.of(
+                    false,
+                    new LogicalType[] {
+                        DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()
+                    },
+                    new String[] {"a", "b"});
+
+    /** {@link AbstractAvroBulkFormat} for tests. */
+    public static class TestingAvroBulkFormat
+            extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {
+
+        protected TestingAvroBulkFormat() {
+            super(AvroSchemaConverter.convertToSchema(ROW_TYPE));
+        }
+
+        @Override
+        protected GenericRecord createReusedAvroRecord() {
+            return new GenericData.Record(readerSchema);
+        }
+
+        @Override
+        protected Function<GenericRecord, RowData> createConverter() {
+            AvroToRowDataConverters.AvroToRowDataConverter converter =
+                    AvroToRowDataConverters.createRowConverter(ROW_TYPE);
+            return record -> record == null ? null : (RowData) converter.convert(record);
+        }
+
+        @Override
+        public TypeInformation<RowData> getProducedType() {
+            return InternalTypeInfo.of(ROW_TYPE);
+        }
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterTestUtil.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterTestUtil.java
new file mode 100644
index 00000000..b1e1071b
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterTestUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Util class for the OrcBulkWriter tests. */
+public class OrcBulkWriterTestUtil {
+
+    public static final String USER_METADATA_KEY = "userKey";
+    public static final ByteBuffer USER_METADATA_VALUE = ByteBuffer.wrap("hello".getBytes());
+
+    public static void validate(File files, List<Record> expected) throws IOException {
+        final File[] buckets = files.listFiles();
+        assertThat(buckets).isNotNull();
+        assertThat(buckets).hasSize(1);
+
+        final File[] partFiles = buckets[0].listFiles();
+        assertThat(partFiles).isNotNull();
+
+        for (File partFile : partFiles) {
+            assertThat(partFile.length()).isGreaterThan(0);
+
+            OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new Configuration());
+            Reader reader =
+                    OrcFile.createReader(
+                            new org.apache.hadoop.fs.Path(partFile.toURI()), readerOptions);
+
+            assertThat(reader.getNumberOfRows()).isEqualTo(3);
+            assertThat(reader.getSchema().getFieldNames()).hasSize(2);
+            assertThat(reader.getCompressionKind()).isSameAs(CompressionKind.LZ4);
+            assertThat(reader.hasMetadataValue(USER_METADATA_KEY)).isTrue();
+            assertThat(reader.getMetadataKeys()).contains(USER_METADATA_KEY);
+
+            List<Record> results = getResults(reader);
+
+            assertThat(results).hasSize(3).isEqualTo(expected);
+        }
+    }
+
+    private static List<Record> getResults(Reader reader) throws IOException {
+        List<Record> results = new ArrayList<>();
+
+        RecordReader recordReader = reader.rows();
+        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+
+        while (recordReader.nextBatch(batch)) {
+            BytesColumnVector stringVector = (BytesColumnVector) batch.cols[0];
+            LongColumnVector intVector = (LongColumnVector) batch.cols[1];
+            for (int r = 0; r < batch.size; r++) {
+                String name =
+                        new String(
+                                stringVector.vector[r],
+                                stringVector.start[r],
+                                stringVector.length[r]);
+                int age = (int) intVector.vector[r];
+
+                results.add(new Record(name, age));
+            }
+            recordReader.close();
+        }
+
+        return results;
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 03d2a69f..2a3e2912 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.format.orc;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
+import org.apache.flink.table.store.format.orc.filter.OrcFileStatsExtractor;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BinaryType;
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
index 73555294..c0e927bc 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.table.store.format.orc;
 
-import org.apache.flink.orc.OrcFilters;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.store.format.orc.filter.OrcPredicateFunctionVisitor;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcReaderFactoryTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcReaderFactoryTest.java
new file mode 100644
index 00000000..41a61684
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcReaderFactoryTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.Utils;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link OrcReaderFactory}. */
+class OrcReaderFactoryTest {
+
+    /** Small batch size for test more boundary conditions. */
+    protected static final int BATCH_SIZE = 9;
+
+    private static final RowType FLAT_FILE_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.STRING().getLogicalType(),
+                        DataTypes.STRING().getLogicalType(),
+                        DataTypes.STRING().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.STRING().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType()
+                    },
+                    new String[] {
+                        "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7",
+                        "_col8"
+                    });
+
+    private static final RowType DECIMAL_FILE_TYPE =
+            RowType.of(new LogicalType[] {new DecimalType(10, 5)}, new String[] {"_col0"});
+
+    private static Path flatFile;
+    private static Path decimalFile;
+
+    @BeforeAll
+    static void setupFiles(@TempDir java.nio.file.Path tmpDir) {
+        flatFile = copyFileFromResource("test-data-flat.orc", tmpDir.resolve("test-data-flat.orc"));
+        decimalFile =
+                copyFileFromResource(
+                        "test-data-decimal.orc", tmpDir.resolve("test-data-decimal.orc"));
+    }
+
+    @Test
+    void testReadFileInSplits() throws IOException {
+        OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
+
+        AtomicInteger cnt = new AtomicInteger(0);
+        AtomicLong totalF0 = new AtomicLong(0);
+
+        // read all splits
+        for (FileSourceSplit split : createSplits(flatFile, 4)) {
+            forEach(
+                    format,
+                    split,
+                    row -> {
+                        assertThat(row.isNullAt(0)).isFalse();
+                        assertThat(row.isNullAt(1)).isFalse();
+                        totalF0.addAndGet(row.getInt(0));
+                        assertThat(row.getString(1).toString()).isNotNull();
+                        cnt.incrementAndGet();
+                    });
+        }
+
+        // check that all rows have been read
+        assertThat(cnt.get()).isEqualTo(1920800);
+        assertThat(totalF0.get()).isEqualTo(1844737280400L);
+    }
+
+    @Test
+    void testReadFileWithSelectFields() throws IOException {
+        OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});
+
+        AtomicInteger cnt = new AtomicInteger(0);
+        AtomicLong totalF0 = new AtomicLong(0);
+
+        // read all splits
+        for (FileSourceSplit split : createSplits(flatFile, 4)) {
+            forEach(
+                    format,
+                    split,
+                    row -> {
+                        assertThat(row.isNullAt(0)).isFalse();
+                        assertThat(row.isNullAt(1)).isFalse();
+                        assertThat(row.isNullAt(2)).isFalse();
+                        assertThat(row.getString(0).toString()).isNotNull();
+                        totalF0.addAndGet(row.getInt(1));
+                        assertThat(row.getString(2).toString()).isNotNull();
+                        cnt.incrementAndGet();
+                    });
+        }
+
+        // check that all rows have been read
+        assertThat(cnt.get()).isEqualTo(1920800);
+        assertThat(totalF0.get()).isEqualTo(1844737280400L);
+    }
+
+    @Test
+    void testReadDecimalTypeFile() throws IOException {
+        OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});
+
+        AtomicInteger cnt = new AtomicInteger(0);
+        AtomicInteger nullCount = new AtomicInteger(0);
+
+        // read all splits
+        for (FileSourceSplit split : createSplits(decimalFile, 4)) {
+            forEach(
+                    format,
+                    split,
+                    row -> {
+                        if (cnt.get() == 0) {
+                            // validate first row
+                            assertThat(row).isNotNull();
+                            assertThat(row.getArity()).isEqualTo(1);
+                            assertThat(row.getDecimal(0, 10, 5))
+                                    .isEqualTo(DecimalDataUtils.castFrom(-1000.5d, 10, 5));
+                        } else {
+                            if (!row.isNullAt(0)) {
+                                assertThat(row.getDecimal(0, 10, 5)).isNotNull();
+                            } else {
+                                nullCount.incrementAndGet();
+                            }
+                        }
+                        cnt.incrementAndGet();
+                    });
+        }
+
+        assertThat(cnt.get()).isEqualTo(6000);
+        assertThat(nullCount.get()).isEqualTo(2000);
+    }
+
+    @Test
+    void testReadFileAndRestore() throws IOException {
+        OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
+
+        // pick a middle split
+        FileSourceSplit split = createSplits(flatFile, 3).get(1);
+
+        int expectedCnt = 660000;
+
+        innerTestRestore(format, split, expectedCnt, 656700330000L);
+    }
+
+    @Test
+    void testReadFileAndRestoreWithFilter() throws IOException {
+        List<OrcFilters.Predicate> filter =
+                Collections.singletonList(
+                        new OrcFilters.Or(
+                                new OrcFilters.Between(
+                                        "_col0", PredicateLeaf.Type.LONG, 0L, 975000L),
+                                new OrcFilters.Equals("_col0", PredicateLeaf.Type.LONG, 980001L),
+                                new OrcFilters.Between(
+                                        "_col0", PredicateLeaf.Type.LONG, 990000L, 1800000L)));
+        OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1}, filter);
+
+        // pick a middle split
+        FileSourceSplit split = createSplits(flatFile, 1).get(0);
+
+        int expectedCnt = 1795000;
+        long expectedTotalF0 = 1615113397500L;
+
+        innerTestRestore(format, split, expectedCnt, expectedTotalF0);
+    }
+
+    private void innerTestRestore(
+            OrcReaderFactory format, FileSourceSplit split, int expectedCnt, long expectedTotalF0)
+            throws IOException {
+        AtomicInteger cnt = new AtomicInteger(0);
+        AtomicLong totalF0 = new AtomicLong(0);
+
+        Consumer<RowData> consumer =
+                row -> {
+                    assertThat(row.isNullAt(0)).isFalse();
+                    assertThat(row.isNullAt(1)).isFalse();
+                    totalF0.addAndGet(row.getInt(0));
+                    assertThat(row.getString(1).toString()).isNotNull();
+                    cnt.incrementAndGet();
+                };
+
+        Utils.forEachRemaining(createReader(format, split), consumer);
+
+        // check that all rows have been read
+        assertThat(cnt.get()).isEqualTo(expectedCnt);
+        assertThat(totalF0.get()).isEqualTo(expectedTotalF0);
+    }
+
+    protected OrcReaderFactory createFormat(RowType formatType, int[] selectedFields) {
+        return createFormat(formatType, selectedFields, new ArrayList<>());
+    }
+
+    protected OrcReaderFactory createFormat(
+            RowType formatType,
+            int[] selectedFields,
+            List<OrcFilters.Predicate> conjunctPredicates) {
+        return new OrcReaderFactory(
+                new Configuration(), formatType, selectedFields, conjunctPredicates, BATCH_SIZE);
+    }
+
+    private BulkFormat.Reader<RowData> createReader(OrcReaderFactory format, FileSourceSplit split)
+            throws IOException {
+        return format.createReader(new org.apache.flink.configuration.Configuration(), split);
+    }
+
+    private void forEach(OrcReaderFactory format, FileSourceSplit split, Consumer<RowData> action)
+            throws IOException {
+        Utils.forEachRemaining(createReader(format, split), action);
+    }
+
+    static Path copyFileFromResource(String resourceName, java.nio.file.Path file) {
+        try (InputStream resource =
+                OrcReaderFactoryTest.class
+                        .getClassLoader()
+                        .getResource(resourceName)
+                        .openStream()) {
+            Files.createDirectories(file.getParent());
+            Files.copy(resource, file);
+            return new Path(file.toString());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static List<FileSourceSplit> createSplits(Path path, int minNumSplits)
+            throws IOException {
+        final List<FileSourceSplit> splits = new ArrayList<>(minNumSplits);
+        final FileStatus fileStatus = path.getFileSystem().getFileStatus(path);
+        final long len = fileStatus.getLen();
+        final long preferSplitSize = len / minNumSplits + (len % minNumSplits == 0 ? 0 : 1);
+        int splitNum = 0;
+        long position = 0;
+        while (position < len) {
+            long splitLen = Math.min(preferSplitSize, len - position);
+            splits.add(
+                    new FileSourceSplit(
+                            String.valueOf(splitNum++),
+                            path,
+                            position,
+                            splitLen,
+                            fileStatus.getModificationTime(),
+                            len));
+            position += splitLen;
+        }
+        return splits;
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcSplitReaderUtilTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcSplitReaderUtilTest.java
new file mode 100644
index 00000000..af8cd66a
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcSplitReaderUtilTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil.logicalTypeToOrcType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link OrcSplitReaderUtil}. */
+class OrcSplitReaderUtilTest {
+
+    @Test
+    void testLogicalTypeToOrcType() {
+        test("boolean", DataTypes.BOOLEAN());
+        test("char(123)", DataTypes.CHAR(123));
+        test("varchar(123)", DataTypes.VARCHAR(123));
+        test("string", DataTypes.STRING());
+        test("binary", DataTypes.BYTES());
+        test("tinyint", DataTypes.TINYINT());
+        test("smallint", DataTypes.SMALLINT());
+        test("int", DataTypes.INT());
+        test("bigint", DataTypes.BIGINT());
+        test("float", DataTypes.FLOAT());
+        test("double", DataTypes.DOUBLE());
+        test("date", DataTypes.DATE());
+        test("timestamp", DataTypes.TIMESTAMP());
+        test("array<float>", DataTypes.ARRAY(DataTypes.FLOAT()));
+        test("map<float,bigint>", DataTypes.MAP(DataTypes.FLOAT(), DataTypes.BIGINT()));
+        test(
+                "struct<int0:int,str1:string,double2:double,row3:struct<int0:int,int1:int>>",
+                DataTypes.ROW(
+                        DataTypes.FIELD("int0", DataTypes.INT()),
+                        DataTypes.FIELD("str1", DataTypes.STRING()),
+                        DataTypes.FIELD("double2", DataTypes.DOUBLE()),
+                        DataTypes.FIELD(
+                                "row3",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("int0", DataTypes.INT()),
+                                        DataTypes.FIELD("int1", DataTypes.INT())))));
+        test("decimal(4,2)", DataTypes.DECIMAL(4, 2));
+    }
+
+    private void test(String expected, DataType type) {
+        assertThat(logicalTypeToOrcType(type.getLogicalType())).hasToString(expected);
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcWriterFactoryTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcWriterFactoryTest.java
new file mode 100644
index 00000000..e222e828
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcWriterFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcFile;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests the behavior of {@link OrcWriterFactory}. */
+class OrcWriterFactoryTest {
+
+    @Test
+    void testNotOverrideInMemoryManager(@TempDir java.nio.file.Path tmpDir) throws IOException {
+        TestMemoryManager memoryManager = new TestMemoryManager();
+        OrcWriterFactory<Record> factory =
+                new TestOrcWriterFactory<>(
+                        new RecordVectorizer("struct<_col0:string,_col1:int>"), memoryManager);
+        factory.create(new LocalDataOutputStream(tmpDir.resolve("file1").toFile()));
+        factory.create(new LocalDataOutputStream(tmpDir.resolve("file2").toFile()));
+
+        List<Path> addedWriterPath = memoryManager.getAddedWriterPath();
+        assertThat(addedWriterPath).hasSize(2);
+        assertThat(addedWriterPath.get(1)).isNotEqualTo(addedWriterPath.get(0));
+    }
+
+    private static class TestOrcWriterFactory<T> extends OrcWriterFactory<T> {
+
+        private final MemoryManager memoryManager;
+
+        public TestOrcWriterFactory(Vectorizer<T> vectorizer, MemoryManager memoryManager) {
+            super(vectorizer);
+            this.memoryManager = checkNotNull(memoryManager);
+        }
+
+        @Override
+        protected OrcFile.WriterOptions getWriterOptions() {
+            OrcFile.WriterOptions options = super.getWriterOptions();
+            options.memory(memoryManager);
+            return options;
+        }
+    }
+
+    private static class TestMemoryManager implements MemoryManager {
+        private final List<Path> addedWriterPath = new ArrayList<>();
+
+        @Override
+        public void addWriter(Path path, long requestedAllocation, Callback callback) {
+            addedWriterPath.add(path);
+        }
+
+        public List<Path> getAddedWriterPath() {
+            return addedWriterPath;
+        }
+
+        @Override
+        public void removeWriter(Path path) {}
+
+        @Override
+        public void addedRow(int rows) {}
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/Record.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/Record.java
new file mode 100644
index 00000000..6f1e7633
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/Record.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import java.io.Serializable;
+
+/** A sample type used for the integration test case. */
+public class Record implements Serializable {
+    private final String name;
+    private final int age;
+
+    public Record(String name, int age) {
+        this.name = name;
+        this.age = age;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public int getAge() {
+        return this.age;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof Record)) {
+            return false;
+        }
+
+        Record otherRecord = (Record) other;
+
+        return this.name.equals(otherRecord.getName()) && this.age == otherRecord.getAge();
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/RecordVectorizer.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/RecordVectorizer.java
new file mode 100644
index 00000000..d2c0485d
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/RecordVectorizer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A Vectorizer implementation used for tests.
+ *
+ * <p>It transforms an input element which is of type {@link Record} to a VectorizedRowBatch.
+ */
+public class RecordVectorizer extends Vectorizer<Record> implements Serializable {
+
+    public RecordVectorizer(String schema) {
+        super(schema);
+    }
+
+    @Override
+    public void vectorize(Record element, VectorizedRowBatch batch) throws IOException {
+        BytesColumnVector stringVector = (BytesColumnVector) batch.cols[0];
+        LongColumnVector intColVector = (LongColumnVector) batch.cols[1];
+
+        int row = batch.size++;
+
+        stringVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
+        intColVector.vector[row] = element.getAge();
+
+        this.addUserMetadata(
+                OrcBulkWriterTestUtil.USER_METADATA_KEY, OrcBulkWriterTestUtil.USER_METADATA_VALUE);
+    }
+}
diff --git a/flink-table-store-format/src/test/resources/test-data-decimal.orc b/flink-table-store-format/src/test/resources/test-data-decimal.orc
new file mode 100644
index 00000000..cb0f7b9d
Binary files /dev/null and b/flink-table-store-format/src/test/resources/test-data-decimal.orc differ
diff --git a/flink-table-store-format/src/test/resources/test-data-flat.orc b/flink-table-store-format/src/test/resources/test-data-flat.orc
new file mode 100644
index 00000000..db0ff15e
Binary files /dev/null and b/flink-table-store-format/src/test/resources/test-data-flat.orc differ
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
index 3f9be225..0af1dcad 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
@@ -201,19 +201,6 @@ under the License.
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-databind</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <dependency>
             <groupId>com.klarna</groupId>
             <artifactId>hiverunner</artifactId>
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
index 88af7f2e..6c13899d 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
@@ -22,17 +22,22 @@ import org.apache.flink.core.fs.Path;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for using S3 in Spark. */
+@RunWith(Parameterized.class)
 public class SparkS3ITCase {
 
     @ClassRule public static final MinioTestContainer MINIO_CONTAINER = new MinioTestContainer();
@@ -54,16 +59,29 @@ public class SparkS3ITCase {
         spark.sql("USE tablestore.db");
     }
 
-    @AfterClass
-    public static void afterEach() {
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<String> parameters() {
+        return Arrays.asList("avro", "orc", "parquet");
+    }
+
+    private final String format;
+
+    public SparkS3ITCase(String format) {
+        this.format = format;
+    }
+
+    @After
+    public void afterEach() {
         spark.sql("DROP TABLE T");
     }
 
     @Test
     public void testWriteRead() {
         spark.sql(
-                "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
-                        + " ('primary-key'='a', 'bucket'='4', 'file.format'='avro')");
+                String.format(
+                        "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+                                + " ('primary-key'='a', 'bucket'='4', 'file.format'='%s')",
+                        format));
         spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
         List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
         assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
diff --git a/pom.xml b/pom.xml
index 2d51715d..28a13c1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,13 +120,9 @@ under the License.
         <flink.table.api.java.bridge>flink-table-api-java-bridge</flink.table.api.java.bridge>
         <flink.table.runtime>flink-table-runtime</flink.table.runtime>
         <flink.streaming.java>flink-streaming-java</flink.streaming.java>
-        <flink.sql.orc>flink-sql-orc</flink.sql.orc>
-        <flink.orc>flink-orc</flink.orc>
-        <flink.parquet>flink-parquet</flink.parquet>
         <flink.connector.kafka>flink-connector-kafka</flink.connector.kafka>
         <flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
         <flink.test.utils>flink-test-utils</flink.test.utils>
-        <flink.sql.parquet>flink-sql-parquet</flink.sql.parquet>
         <janino.version>3.0.11</janino.version>
         <mockito.version>3.4.6</mockito.version>
     </properties>
@@ -315,13 +311,9 @@ under the License.
                 <flink.table.api.java.bridge>flink-table-api-java-bridge_${scala.binary.version}</flink.table.api.java.bridge>
                 <flink.table.runtime>flink-table-runtime_${scala.binary.version}</flink.table.runtime>
                 <flink.streaming.java>flink-streaming-java_${scala.binary.version}</flink.streaming.java>
-                <flink.sql.orc>flink-sql-orc_${scala.binary.version}</flink.sql.orc>
-                <flink.orc>flink-orc_${scala.binary.version}</flink.orc>
-                <flink.parquet>flink-parquet_${scala.binary.version}</flink.parquet>
                 <flink.connector.kafka>flink-connector-kafka_${scala.binary.version}</flink.connector.kafka>
                 <flink.sql.connector.kafka>flink-sql-connector-kafka_${scala.binary.version}</flink.sql.connector.kafka>
                 <flink.test.utils>flink-test-utils_${scala.binary.version}</flink.test.utils>
-                <flink.sql.parquet>flink-sql-parquet_${scala.binary.version}</flink.sql.parquet>
             </properties>
         </profile>