You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/23 06:23:15 UTC

[flink-table-store] branch master updated: [FLINK-26703] Introduce flink-table-store-dist module for packaging

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a2d004  [FLINK-26703] Introduce flink-table-store-dist module for packaging
4a2d004 is described below

commit 4a2d004c960694481a91ea02f2f0fecc36335e94
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Mar 23 14:23:11 2022 +0800

    [FLINK-26703] Introduce flink-table-store-dist module for packaging
    
    This closes #50
---
 .../flink/table/store/file/format/FileFormat.java  |  97 ++++++++-
 .../table/store/file/format/FileFormatFactory.java |   2 +-
 .../table/store/file/format/FileFormatImpl.java    |  87 +--------
 .../file/format/FileStatsExtractingAvroFormat.java |  27 +--
 .../FileStatsExtractingAvroFormatFactory.java      |   2 +-
 .../store/file/format/FlushingFileFormat.java      |  20 +-
 flink-table-store-dist/pom.xml                     | 217 +++++++++++++++++++++
 .../pom.xml                                        |  11 +-
 .../table/store/format/avro/AvroFileFormat.java    |  48 +++++
 .../store/format/avro/AvroFileFormatFactory.java   |  12 +-
 .../table/store/format}/orc/OrcFileFormat.java     |  28 ++-
 .../store/format}/orc/OrcFileFormatFactory.java    |   6 +-
 .../store/format}/orc/OrcFileStatsExtractor.java   |   2 +-
 ...flink.table.store.file.format.FileFormatFactory |   3 +-
 .../format}/orc/OrcFileStatsExtractorTest.java     |   2 +-
 pom.xml                                            |   5 +-
 16 files changed, 412 insertions(+), 157 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
index a403ac2..fde866a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
@@ -19,25 +19,41 @@
 package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.ServiceLoader;
 
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
 /** Factory class which creates reader and writer factories for specific file format. */
-public interface FileFormat {
+public abstract class FileFormat {
+
+    protected abstract BulkDecodingFormat<RowData> getDecodingFormat();
+
+    protected abstract EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat();
 
     /**
      * Create a {@link BulkFormat} from the type, with projection pushed down.
@@ -46,13 +62,27 @@ public interface FileFormat {
      * @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
      * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
      */
-    BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters);
+    @SuppressWarnings("unchecked")
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
+        // TODO use ProjectingBulkFormat if not supported
+        Preconditions.checkState(
+                decodingFormat instanceof ProjectableDecodingFormat,
+                "Format "
+                        + decodingFormat.getClass().getName()
+                        + " does not support projection push down");
+        decodingFormat.applyFilters(filters);
+        return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
+                .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(type), projection);
+    }
 
     /** Create a {@link BulkWriter.Factory} from the type. */
-    BulkWriter.Factory<RowData> createWriterFactory(RowType type);
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+        return getEncodingFormat().createRuntimeEncoder(SINK_CONTEXT, fromLogicalToDataType(type));
+    }
 
-    default BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
         int[][] projection = new int[rowType.getFieldCount()][];
         for (int i = 0; i < projection.length; i++) {
             projection[i] = new int[] {i};
@@ -60,17 +90,17 @@ public interface FileFormat {
         return createReaderFactory(rowType, projection);
     }
 
-    default BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
             RowType rowType, int[][] projection) {
         return createReaderFactory(rowType, projection, new ArrayList<>());
     }
 
-    default Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
         return Optional.empty();
     }
 
     /** Create a {@link FileFormatImpl} from table options. */
-    static FileFormat fromTableOptions(
+    public static FileFormat fromTableOptions(
             ClassLoader classLoader,
             Configuration tableOptions,
             ConfigOption<String> formatOption) {
@@ -81,15 +111,62 @@ public interface FileFormat {
     }
 
     /** Create a {@link FileFormatImpl} from format identifier and format options. */
-    static FileFormat fromIdentifier(
+    public static FileFormat fromIdentifier(
             ClassLoader classLoader, String formatIdentifier, ReadableConfig formatOptions) {
         ServiceLoader<FileFormatFactory> serviceLoader =
                 ServiceLoader.load(FileFormatFactory.class);
         for (FileFormatFactory factory : serviceLoader) {
             if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
-                return factory.create(classLoader, formatOptions);
+                return factory.create(formatOptions);
             }
         }
         return new FileFormatImpl(classLoader, formatIdentifier, formatOptions);
     }
+
+    private static final DynamicTableSink.Context SINK_CONTEXT =
+            new DynamicTableSink.Context() {
+
+                @Override
+                public boolean isBounded() {
+                    return false;
+                }
+
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
+                    return InternalTypeInfo.of(consumedDataType.getLogicalType());
+                }
+
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(
+                        LogicalType consumedLogicalType) {
+                    return InternalTypeInfo.of(consumedLogicalType);
+                }
+
+                @Override
+                public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                        DataType consumedDataType) {
+                    throw new UnsupportedOperationException();
+                }
+            };
+
+    private static final DynamicTableSource.Context SOURCE_CONTEXT =
+            new DynamicTableSource.Context() {
+
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
+                    return InternalTypeInfo.of(consumedDataType.getLogicalType());
+                }
+
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(
+                        LogicalType producedLogicalType) {
+                    return InternalTypeInfo.of(producedLogicalType);
+                }
+
+                @Override
+                public DynamicTableSource.DataStructureConverter createDataStructureConverter(
+                        DataType consumedDataType) {
+                    throw new UnsupportedOperationException();
+                }
+            };
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
index 58204df..f103d2d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
@@ -25,5 +25,5 @@ public interface FileFormatFactory {
 
     String identifier();
 
-    FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions);
+    FileFormat create(ReadableConfig formatOptions);
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
index 1846e04..0f8777c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
@@ -19,31 +19,16 @@
 package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.List;
-
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 
 /** A {@link FileFormat} which discovers reader and writer from format identifier. */
-public class FileFormatImpl implements FileFormat {
+public class FileFormatImpl extends FileFormat {
 
     private final ClassLoader classLoader;
     private final String formatIdentifier;
@@ -56,78 +41,16 @@ public class FileFormatImpl implements FileFormat {
         this.formatOptions = formatOptions;
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType rowType, int[][] projection, List<ResolvedExpression> filters) {
-        BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
-        // TODO use ProjectingBulkFormat if not supported
-        Preconditions.checkState(
-                decodingFormat instanceof ProjectableDecodingFormat,
-                "Format " + formatIdentifier + " does not support projection push down");
-        decodingFormat.applyFilters(filters);
-        return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
-                .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(rowType), projection);
-    }
-
-    private BulkDecodingFormat<RowData> getDecodingFormat() {
+    protected BulkDecodingFormat<RowData> getDecodingFormat() {
         return FactoryUtil.discoverFactory(
                         classLoader, BulkReaderFormatFactory.class, formatIdentifier)
                 .createDecodingFormat(null, formatOptions); // context is useless
     }
 
     @Override
-    public BulkWriter.Factory<RowData> createWriterFactory(RowType rowType) {
+    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
         return FactoryUtil.discoverFactory(
                         classLoader, BulkWriterFormatFactory.class, formatIdentifier)
-                .createEncodingFormat(null, formatOptions) // context is useless
-                .createRuntimeEncoder(SINK_CONTEXT, fromLogicalToDataType(rowType));
+                .createEncodingFormat(null, formatOptions); // context is useless
     }
-
-    private static final DynamicTableSink.Context SINK_CONTEXT =
-            new DynamicTableSink.Context() {
-
-                @Override
-                public boolean isBounded() {
-                    return false;
-                }
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
-                    return InternalTypeInfo.of(consumedDataType.getLogicalType());
-                }
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(
-                        LogicalType consumedLogicalType) {
-                    return InternalTypeInfo.of(consumedLogicalType);
-                }
-
-                @Override
-                public DynamicTableSink.DataStructureConverter createDataStructureConverter(
-                        DataType consumedDataType) {
-                    throw new UnsupportedOperationException();
-                }
-            };
-
-    private static final DynamicTableSource.Context SOURCE_CONTEXT =
-            new DynamicTableSource.Context() {
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
-                    return InternalTypeInfo.of(consumedDataType.getLogicalType());
-                }
-
-                @Override
-                public <T> TypeInformation<T> createTypeInformation(
-                        LogicalType producedLogicalType) {
-                    return InternalTypeInfo.of(producedLogicalType);
-                }
-
-                @Override
-                public DynamicTableSource.DataStructureConverter createDataStructureConverter(
-                        DataType consumedDataType) {
-                    throw new UnsupportedOperationException();
-                }
-            };
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 77e9807..0bbec2e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -18,41 +18,22 @@
 
 package org.apache.flink.table.store.file.format;
 
-import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.util.List;
 import java.util.Optional;
 
 /** An avro {@link FileFormat} for test. It provides a {@link FileStatsExtractor}. */
-public class FileStatsExtractingAvroFormat implements FileFormat {
+public class FileStatsExtractingAvroFormat extends FileFormatImpl {
 
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    FileStatsExtractingAvroFormat.class.getClassLoader(),
-                    "avro",
-                    new Configuration());
-
-    @Override
-    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
-        return avro.createReaderFactory(type, projection, filters);
-    }
-
-    @Override
-    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
-        return avro.createWriterFactory(type);
+    public FileStatsExtractingAvroFormat() {
+        super(FileStatsExtractingAvroFormat.class.getClassLoader(), "avro", new Configuration());
     }
 
     @Override
     public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
-        return Optional.of(new TestFileStatsExtractor(avro, type));
+        return Optional.of(new TestFileStatsExtractor(this, type));
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
index 1b8fdef..656fb0b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
@@ -29,7 +29,7 @@ public class FileStatsExtractingAvroFormatFactory implements FileFormatFactory {
     }
 
     @Override
-    public FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions) {
+    public FileFormat create(ReadableConfig formatOptions) {
         return new FileStatsExtractingAvroFormat();
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index fb8f90c..2eebb6f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -20,17 +20,15 @@ package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
-import java.util.List;
 
 /** A special {@link FileFormat} which flushes for every added element. */
-public class FlushingFileFormat implements FileFormat {
+public class FlushingFileFormat extends FileFormat {
 
     private final FileFormat format;
 
@@ -41,16 +39,20 @@ public class FlushingFileFormat implements FileFormat {
     }
 
     @Override
-    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
-        return format.createReaderFactory(type, projection, filters);
+    protected BulkDecodingFormat<RowData> getDecodingFormat() {
+        return format.getDecodingFormat();
+    }
+
+    @Override
+    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
+        return format.getEncodingFormat();
     }
 
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
         return fsDataOutputStream -> {
             BulkWriter<RowData> wrapped =
-                    format.createWriterFactory(type).create(fsDataOutputStream);
+                    super.createWriterFactory(type).create(fsDataOutputStream);
             return new BulkWriter<RowData>() {
                 @Override
                 public void addElement(RowData rowData) throws IOException {
diff --git a/flink-table-store-dist/pom.xml b/flink-table-store-dist/pom.xml
new file mode 100644
index 0000000..c8837ca
--- /dev/null
+++ b/flink-table-store-dist/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-dist</artifactId>
+    <name>Flink Table Store : Dist</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-codegen-loader</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-connector</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-format</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- used flink connectors -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-kafka</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- default formats -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-avro</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-orc</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    <include>org.apache.flink:flink-table-store-codegen-loader</include>
+                                    <include>org.apache.flink:flink-table-store-common</include>
+                                    <include>org.apache.flink:flink-table-store-connector</include>
+                                    <include>org.apache.flink:flink-table-store-core</include>
+                                    <include>org.apache.flink:flink-table-store-format</include>
+                                    <include>org.apache.flink:flink-table-store-kafka</include>
+                                    <include>org.apache.flink:flink-connector-files</include>
+                                    <include>org.apache.flink:flink-sql-connector-kafka</include>
+                                    <include>org.apache.flink:flink-sql-avro</include>
+                                    <include>org.apache.flink:flink-sql-orc</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <!--
+                                Throw away all META-INF/services,
+                                otherwise if user has the same format/connector jar in the classpath,
+                                FactoryUtil will complain about multiple matching factories.
+                                -->
+                                <filter>
+                                    <artifact>org.apache.flink:flink-connector-files</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/services/**</exclude>
+                                    </excludes>
+                                </filter>
+                                <filter>
+                                    <artifact>org.apache.flink:flink-sql-connector-kafka</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/services/**</exclude>
+                                    </excludes>
+                                </filter>
+                                <filter>
+                                    <artifact>org.apache.flink:flink-sql-avro</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/services/**</exclude>
+                                    </excludes>
+                                </filter>
+                                <filter>
+                                    <artifact>org.apache.flink:flink-sql-orc</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/services/**</exclude>
+                                    </excludes>
+                                </filter>
+                                <!-- Another copy of the Apache license, which we don't need. -->
+                                <filter>
+                                    <artifact>*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/LICENSE.txt</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.flink.connector</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.connector</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.flink.kafka</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.kafka</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.flink.avro</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.avro</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.flink.orc</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.orc</shadedPattern>
+                                </relocation>
+                                <!--
+                                flink-sql-orc module does not shade its dependencies, so we shade here.
+                                See maven-shade-plugin usage of flink-sql-orc for detailed dependency list.
+                                -->
+                                <relocation>
+                                    <pattern>org.apache.orc</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.orc</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.hive</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hive</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.hadoop.hive</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hadoop.hive</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.airlift</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.io.airlift</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.protobuf</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.com.google.protobuf</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-orc/pom.xml b/flink-table-store-format/pom.xml
similarity index 92%
rename from flink-table-store-orc/pom.xml
rename to flink-table-store-format/pom.xml
index a7a6da8..88535e2 100644
--- a/flink-table-store-orc/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -28,8 +28,8 @@ under the License.
         <version>0.1-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-orc</artifactId>
-    <name>Flink Table Store : Orc</name>
+    <artifactId>flink-table-store-format</artifactId>
+    <name>Flink Table Store : Format</name>
 
     <dependencies>
         <dependency>
@@ -57,6 +57,13 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-orc</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
new file mode 100644
index 0000000..10a6fa9
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.format.FileFormat;
+
+/** Avro {@link FileFormat}. */
+public class AvroFileFormat extends FileFormat {
+
+    private final ReadableConfig formatOptions;
+
+    public AvroFileFormat(ReadableConfig formatOptions) {
+        this.formatOptions = formatOptions;
+    }
+
+    @Override
+    protected BulkDecodingFormat<RowData> getDecodingFormat() {
+        return new org.apache.flink.formats.avro.AvroFileFormatFactory()
+                .createDecodingFormat(null, formatOptions); // context is useless
+    }
+
+    @Override
+    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
+        return new org.apache.flink.formats.avro.AvroFileFormatFactory()
+                .createEncodingFormat(null, formatOptions); // context is useless
+    }
+}
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
similarity index 76%
copy from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
copy to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
index c31dd10..63a0e45 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
@@ -16,22 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.avro;
 
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.format.FileFormatFactory;
 
-/** Factory to create {@link OrcFileFormat}. */
-public class OrcFileFormatFactory implements FileFormatFactory {
+/** Factory to create {@link AvroFileFormat}. */
+public class AvroFileFormatFactory implements FileFormatFactory {
 
     @Override
     public String identifier() {
-        return "orc";
+        return "avro";
     }
 
     @Override
-    public FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions) {
-        return new OrcFileFormat(classLoader, formatOptions);
+    public FileFormat create(ReadableConfig formatOptions) {
+        return new AvroFileFormat(formatOptions);
     }
 }
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
similarity index 60%
rename from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 0087cc3..ddb6a97 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -16,40 +16,38 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.format.FileFormatImpl;
 import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.util.List;
 import java.util.Optional;
 
 /** Orc {@link FileFormat}. */
-public class OrcFileFormat implements FileFormat {
+public class OrcFileFormat extends FileFormat {
 
-    private final FileFormatImpl format;
+    private final ReadableConfig formatOptions;
 
-    public OrcFileFormat(ClassLoader classLoader, ReadableConfig formatOptions) {
-        this.format = new FileFormatImpl(classLoader, "orc", formatOptions);
+    public OrcFileFormat(ReadableConfig formatOptions) {
+        this.formatOptions = formatOptions;
     }
 
     @Override
-    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, int[][] projection, List<ResolvedExpression> filters) {
-        return format.createReaderFactory(type, projection, filters);
+    protected BulkDecodingFormat<RowData> getDecodingFormat() {
+        return new org.apache.flink.orc.OrcFileFormatFactory()
+                .createDecodingFormat(null, formatOptions); // context is useless
     }
 
     @Override
-    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
-        return format.createWriterFactory(type);
+    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
+        return new org.apache.flink.orc.OrcFileFormatFactory()
+                .createEncodingFormat(null, formatOptions); // context is useless
     }
 
     @Override
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
similarity index 86%
rename from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
index c31dd10..b0902be 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
 
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.store.file.format.FileFormat;
@@ -31,7 +31,7 @@ public class OrcFileFormatFactory implements FileFormatFactory {
     }
 
     @Override
-    public FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions) {
-        return new OrcFileFormat(classLoader, formatOptions);
+    public FileFormat create(ReadableConfig formatOptions) {
+        return new OrcFileFormat(formatOptions);
     }
 }
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
similarity index 99%
rename from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
index 8d321af..1acd68d 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.DecimalData;
diff --git a/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
similarity index 86%
rename from flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
rename to flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
index 354bcc0..e0d5457 100644
--- a/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
+++ b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.store.orc.OrcFileFormatFactory
\ No newline at end of file
+org.apache.flink.table.store.format.avro.AvroFileFormatFactory
+org.apache.flink.table.store.format.orc.OrcFileFormatFactory
\ No newline at end of file
diff --git a/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
similarity index 98%
rename from flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
rename to flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 82d9271..1ee12cc 100644
--- a/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.format.FileFormat;
diff --git a/pom.xml b/pom.xml
index 6ed400e..344ba14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,8 +57,9 @@ under the License.
         <module>flink-table-store-common</module>
         <module>flink-table-store-connector</module>
         <module>flink-table-store-core</module>
+        <module>flink-table-store-dist</module>
+        <module>flink-table-store-format</module>
         <module>flink-table-store-kafka</module>
-        <module>flink-table-store-orc</module>
     </modules>
 
     <properties>
@@ -486,7 +487,7 @@ under the License.
                                     </includes>
                                     <message>
                                         Direct dependencies on flink-table-planner are not allowed.
-                                        You should depend on either Table API modules, flink-table-planner-loader or flink-table-store-delegate-loader.
+                                        You should depend on either Table API modules, flink-table-planner-loader or flink-table-store-codegen-loader.
                                     </message>
                                 </bannedDependencies>
                             </rules>