You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/23 06:23:15 UTC
[flink-table-store] branch master updated: [FLINK-26703] Introduce flink-table-store-dist module for packaging
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 4a2d004 [FLINK-26703] Introduce flink-table-store-dist module for packaging
4a2d004 is described below
commit 4a2d004c960694481a91ea02f2f0fecc36335e94
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Mar 23 14:23:11 2022 +0800
[FLINK-26703] Introduce flink-table-store-dist module for packaging
This closes #50
---
.../flink/table/store/file/format/FileFormat.java | 97 ++++++++-
.../table/store/file/format/FileFormatFactory.java | 2 +-
.../table/store/file/format/FileFormatImpl.java | 87 +--------
.../file/format/FileStatsExtractingAvroFormat.java | 27 +--
.../FileStatsExtractingAvroFormatFactory.java | 2 +-
.../store/file/format/FlushingFileFormat.java | 20 +-
flink-table-store-dist/pom.xml | 217 +++++++++++++++++++++
.../pom.xml | 11 +-
.../table/store/format/avro/AvroFileFormat.java | 48 +++++
.../store/format/avro/AvroFileFormatFactory.java | 12 +-
.../table/store/format}/orc/OrcFileFormat.java | 28 ++-
.../store/format}/orc/OrcFileFormatFactory.java | 6 +-
.../store/format}/orc/OrcFileStatsExtractor.java | 2 +-
...flink.table.store.file.format.FileFormatFactory | 3 +-
.../format}/orc/OrcFileStatsExtractorTest.java | 2 +-
pom.xml | 5 +-
16 files changed, 412 insertions(+), 157 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
index a403ac2..fde866a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
@@ -19,25 +19,41 @@
package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
/** Factory class which creates reader and writer factories for specific file format. */
-public interface FileFormat {
+public abstract class FileFormat {
+
+ protected abstract BulkDecodingFormat<RowData> getDecodingFormat();
+
+ protected abstract EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat();
/**
* Create a {@link BulkFormat} from the type, with projection pushed down.
@@ -46,13 +62,27 @@ public interface FileFormat {
* @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
* @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
*/
- BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters);
+ @SuppressWarnings("unchecked")
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
+ // TODO use ProjectingBulkFormat if not supported
+ Preconditions.checkState(
+ decodingFormat instanceof ProjectableDecodingFormat,
+ "Format "
+ + decodingFormat.getClass().getName()
+ + " does not support projection push down");
+ decodingFormat.applyFilters(filters);
+ return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
+ .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(type), projection);
+ }
/** Create a {@link BulkWriter.Factory} from the type. */
- BulkWriter.Factory<RowData> createWriterFactory(RowType type);
+ public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+ return getEncodingFormat().createRuntimeEncoder(SINK_CONTEXT, fromLogicalToDataType(type));
+ }
- default BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
int[][] projection = new int[rowType.getFieldCount()][];
for (int i = 0; i < projection.length; i++) {
projection[i] = new int[] {i};
@@ -60,17 +90,17 @@ public interface FileFormat {
return createReaderFactory(rowType, projection);
}
- default BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
RowType rowType, int[][] projection) {
return createReaderFactory(rowType, projection, new ArrayList<>());
}
- default Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+ public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
return Optional.empty();
}
/** Create a {@link FileFormatImpl} from table options. */
- static FileFormat fromTableOptions(
+ public static FileFormat fromTableOptions(
ClassLoader classLoader,
Configuration tableOptions,
ConfigOption<String> formatOption) {
@@ -81,15 +111,62 @@ public interface FileFormat {
}
/** Create a {@link FileFormatImpl} from format identifier and format options. */
- static FileFormat fromIdentifier(
+ public static FileFormat fromIdentifier(
ClassLoader classLoader, String formatIdentifier, ReadableConfig formatOptions) {
ServiceLoader<FileFormatFactory> serviceLoader =
ServiceLoader.load(FileFormatFactory.class);
for (FileFormatFactory factory : serviceLoader) {
if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
- return factory.create(classLoader, formatOptions);
+ return factory.create(formatOptions);
}
}
return new FileFormatImpl(classLoader, formatIdentifier, formatOptions);
}
+
+ private static final DynamicTableSink.Context SINK_CONTEXT =
+ new DynamicTableSink.Context() {
+
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
+ return InternalTypeInfo.of(consumedDataType.getLogicalType());
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ LogicalType consumedLogicalType) {
+ return InternalTypeInfo.of(consumedLogicalType);
+ }
+
+ @Override
+ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+ DataType consumedDataType) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ private static final DynamicTableSource.Context SOURCE_CONTEXT =
+ new DynamicTableSource.Context() {
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
+ return InternalTypeInfo.of(consumedDataType.getLogicalType());
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ LogicalType producedLogicalType) {
+ return InternalTypeInfo.of(producedLogicalType);
+ }
+
+ @Override
+ public DynamicTableSource.DataStructureConverter createDataStructureConverter(
+ DataType consumedDataType) {
+ throw new UnsupportedOperationException();
+ }
+ };
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
index 58204df..f103d2d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
@@ -25,5 +25,5 @@ public interface FileFormatFactory {
String identifier();
- FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions);
+ FileFormat create(ReadableConfig formatOptions);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
index 1846e04..0f8777c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
@@ -19,31 +19,16 @@
package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.List;
-
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/** A {@link FileFormat} which discovers reader and writer from format identifier. */
-public class FileFormatImpl implements FileFormat {
+public class FileFormatImpl extends FileFormat {
private final ClassLoader classLoader;
private final String formatIdentifier;
@@ -56,78 +41,16 @@ public class FileFormatImpl implements FileFormat {
this.formatOptions = formatOptions;
}
- @SuppressWarnings("unchecked")
- @Override
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType rowType, int[][] projection, List<ResolvedExpression> filters) {
- BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
- // TODO use ProjectingBulkFormat if not supported
- Preconditions.checkState(
- decodingFormat instanceof ProjectableDecodingFormat,
- "Format " + formatIdentifier + " does not support projection push down");
- decodingFormat.applyFilters(filters);
- return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
- .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(rowType), projection);
- }
-
- private BulkDecodingFormat<RowData> getDecodingFormat() {
+ protected BulkDecodingFormat<RowData> getDecodingFormat() {
return FactoryUtil.discoverFactory(
classLoader, BulkReaderFormatFactory.class, formatIdentifier)
.createDecodingFormat(null, formatOptions); // context is useless
}
@Override
- public BulkWriter.Factory<RowData> createWriterFactory(RowType rowType) {
+ protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
return FactoryUtil.discoverFactory(
classLoader, BulkWriterFormatFactory.class, formatIdentifier)
- .createEncodingFormat(null, formatOptions) // context is useless
- .createRuntimeEncoder(SINK_CONTEXT, fromLogicalToDataType(rowType));
+ .createEncodingFormat(null, formatOptions); // context is useless
}
-
- private static final DynamicTableSink.Context SINK_CONTEXT =
- new DynamicTableSink.Context() {
-
- @Override
- public boolean isBounded() {
- return false;
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
- return InternalTypeInfo.of(consumedDataType.getLogicalType());
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- LogicalType consumedLogicalType) {
- return InternalTypeInfo.of(consumedLogicalType);
- }
-
- @Override
- public DynamicTableSink.DataStructureConverter createDataStructureConverter(
- DataType consumedDataType) {
- throw new UnsupportedOperationException();
- }
- };
-
- private static final DynamicTableSource.Context SOURCE_CONTEXT =
- new DynamicTableSource.Context() {
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
- return InternalTypeInfo.of(consumedDataType.getLogicalType());
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- LogicalType producedLogicalType) {
- return InternalTypeInfo.of(producedLogicalType);
- }
-
- @Override
- public DynamicTableSource.DataStructureConverter createDataStructureConverter(
- DataType consumedDataType) {
- throw new UnsupportedOperationException();
- }
- };
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 77e9807..0bbec2e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -18,41 +18,22 @@
package org.apache.flink.table.store.file.format;
-import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
-import java.util.List;
import java.util.Optional;
/** An avro {@link FileFormat} for test. It provides a {@link FileStatsExtractor}. */
-public class FileStatsExtractingAvroFormat implements FileFormat {
+public class FileStatsExtractingAvroFormat extends FileFormatImpl {
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- FileStatsExtractingAvroFormat.class.getClassLoader(),
- "avro",
- new Configuration());
-
- @Override
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
- return avro.createReaderFactory(type, projection, filters);
- }
-
- @Override
- public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
- return avro.createWriterFactory(type);
+ public FileStatsExtractingAvroFormat() {
+ super(FileStatsExtractingAvroFormat.class.getClassLoader(), "avro", new Configuration());
}
@Override
public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
- return Optional.of(new TestFileStatsExtractor(avro, type));
+ return Optional.of(new TestFileStatsExtractor(this, type));
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
index 1b8fdef..656fb0b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
@@ -29,7 +29,7 @@ public class FileStatsExtractingAvroFormatFactory implements FileFormatFactory {
}
@Override
- public FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions) {
+ public FileFormat create(ReadableConfig formatOptions) {
return new FileStatsExtractingAvroFormat();
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index fb8f90c..2eebb6f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -20,17 +20,15 @@ package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
-import java.util.List;
/** A special {@link FileFormat} which flushes for every added element. */
-public class FlushingFileFormat implements FileFormat {
+public class FlushingFileFormat extends FileFormat {
private final FileFormat format;
@@ -41,16 +39,20 @@ public class FlushingFileFormat implements FileFormat {
}
@Override
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
- return format.createReaderFactory(type, projection, filters);
+ protected BulkDecodingFormat<RowData> getDecodingFormat() {
+ return format.getDecodingFormat();
+ }
+
+ @Override
+ protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
+ return format.getEncodingFormat();
}
@Override
public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
return fsDataOutputStream -> {
BulkWriter<RowData> wrapped =
- format.createWriterFactory(type).create(fsDataOutputStream);
+ super.createWriterFactory(type).create(fsDataOutputStream);
return new BulkWriter<RowData>() {
@Override
public void addElement(RowData rowData) throws IOException {
diff --git a/flink-table-store-dist/pom.xml b/flink-table-store-dist/pom.xml
new file mode 100644
index 0000000..c8837ca
--- /dev/null
+++ b/flink-table-store-dist/pom.xml
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-dist</artifactId>
+ <name>Flink Table Store : Dist</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-codegen-loader</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-connector</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-format</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- used flink connectors -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-kafka</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- default formats -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-avro</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-orc</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+ <include>org.apache.flink:flink-table-store-codegen-loader</include>
+ <include>org.apache.flink:flink-table-store-common</include>
+ <include>org.apache.flink:flink-table-store-connector</include>
+ <include>org.apache.flink:flink-table-store-core</include>
+ <include>org.apache.flink:flink-table-store-format</include>
+ <include>org.apache.flink:flink-table-store-kafka</include>
+ <include>org.apache.flink:flink-connector-files</include>
+ <include>org.apache.flink:flink-sql-connector-kafka</include>
+ <include>org.apache.flink:flink-sql-avro</include>
+ <include>org.apache.flink:flink-sql-orc</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <!--
+ Throw away all META-INF/services,
+ otherwise if user has the same format/connector jar in the classpath,
+ FactoryUtil will complain about multiple matching factories.
+ -->
+ <filter>
+ <artifact>org.apache.flink:flink-connector-files</artifact>
+ <excludes>
+ <exclude>META-INF/services/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>org.apache.flink:flink-sql-connector-kafka</artifact>
+ <excludes>
+ <exclude>META-INF/services/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>org.apache.flink:flink-sql-avro</artifact>
+ <excludes>
+ <exclude>META-INF/services/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>org.apache.flink:flink-sql-orc</artifact>
+ <excludes>
+ <exclude>META-INF/services/**</exclude>
+ </excludes>
+ </filter>
+ <!-- Another copy of the Apache license, which we don't need. -->
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.flink.connector</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.connector</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.flink.kafka</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.kafka</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.flink.avro</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.avro</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.flink.orc</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.orc</shadedPattern>
+ </relocation>
+ <!--
+ flink-sql-orc module does not shade its dependencies, so we shade here.
+ See maven-shade-plugin usage of flink-sql-orc for detailed dependency list.
+ -->
+ <relocation>
+ <pattern>org.apache.orc</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.orc</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.hive</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hive</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.hadoop.hive</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hadoop.hive</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.airlift</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.io.airlift</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.com.google.protobuf</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-orc/pom.xml b/flink-table-store-format/pom.xml
similarity index 92%
rename from flink-table-store-orc/pom.xml
rename to flink-table-store-format/pom.xml
index a7a6da8..88535e2 100644
--- a/flink-table-store-orc/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -28,8 +28,8 @@ under the License.
<version>0.1-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-orc</artifactId>
- <name>Flink Table Store : Orc</name>
+ <artifactId>flink-table-store-format</artifactId>
+ <name>Flink Table Store : Format</name>
<dependencies>
<dependency>
@@ -57,6 +57,13 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-orc</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
new file mode 100644
index 0000000..10a6fa9
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.format.FileFormat;
+
+/** Avro {@link FileFormat}. */
+public class AvroFileFormat extends FileFormat {
+
+ private final ReadableConfig formatOptions;
+
+ public AvroFileFormat(ReadableConfig formatOptions) {
+ this.formatOptions = formatOptions;
+ }
+
+ @Override
+ protected BulkDecodingFormat<RowData> getDecodingFormat() {
+ return new org.apache.flink.formats.avro.AvroFileFormatFactory()
+ .createDecodingFormat(null, formatOptions); // context is useless
+ }
+
+ @Override
+ protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
+ return new org.apache.flink.formats.avro.AvroFileFormatFactory()
+ .createEncodingFormat(null, formatOptions); // context is useless
+ }
+}
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
similarity index 76%
copy from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
copy to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
index c31dd10..63a0e45 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
@@ -16,22 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.avro;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.format.FileFormatFactory;
-/** Factory to create {@link OrcFileFormat}. */
-public class OrcFileFormatFactory implements FileFormatFactory {
+/** Factory to create {@link AvroFileFormat}. */
+public class AvroFileFormatFactory implements FileFormatFactory {
@Override
public String identifier() {
- return "orc";
+ return "avro";
}
@Override
- public FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions) {
- return new OrcFileFormat(classLoader, formatOptions);
+ public FileFormat create(ReadableConfig formatOptions) {
+ return new AvroFileFormat(formatOptions);
}
}
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
similarity index 60%
rename from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 0087cc3..ddb6a97 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -16,40 +16,38 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.format.FileFormatImpl;
import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
-import java.util.List;
import java.util.Optional;
/** Orc {@link FileFormat}. */
-public class OrcFileFormat implements FileFormat {
+public class OrcFileFormat extends FileFormat {
- private final FileFormatImpl format;
+ private final ReadableConfig formatOptions;
- public OrcFileFormat(ClassLoader classLoader, ReadableConfig formatOptions) {
- this.format = new FileFormatImpl(classLoader, "orc", formatOptions);
+ public OrcFileFormat(ReadableConfig formatOptions) {
+ this.formatOptions = formatOptions;
}
@Override
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
- return format.createReaderFactory(type, projection, filters);
+ protected BulkDecodingFormat<RowData> getDecodingFormat() {
+ return new org.apache.flink.orc.OrcFileFormatFactory()
+ .createDecodingFormat(null, formatOptions); // context is useless
}
@Override
- public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
- return format.createWriterFactory(type);
+ protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
+ return new org.apache.flink.orc.OrcFileFormatFactory()
+ .createEncodingFormat(null, formatOptions); // context is useless
}
@Override
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
similarity index 86%
rename from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
index c31dd10..b0902be 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.store.file.format.FileFormat;
@@ -31,7 +31,7 @@ public class OrcFileFormatFactory implements FileFormatFactory {
}
@Override
- public FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions) {
- return new OrcFileFormat(classLoader, formatOptions);
+ public FileFormat create(ReadableConfig formatOptions) {
+ return new OrcFileFormat(formatOptions);
}
}
diff --git a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
similarity index 99%
rename from flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
index 8d321af..1acd68d 100644
--- a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.DecimalData;
diff --git a/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
similarity index 86%
rename from flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
rename to flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
index 354bcc0..e0d5457 100644
--- a/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
+++ b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.table.store.orc.OrcFileFormatFactory
\ No newline at end of file
+org.apache.flink.table.store.format.avro.AvroFileFormatFactory
+org.apache.flink.table.store.format.orc.OrcFileFormatFactory
\ No newline at end of file
diff --git a/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
similarity index 98%
rename from flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
rename to flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 82d9271..1ee12cc 100644
--- a/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.orc;
+package org.apache.flink.table.store.format.orc;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.format.FileFormat;
diff --git a/pom.xml b/pom.xml
index 6ed400e..344ba14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,8 +57,9 @@ under the License.
<module>flink-table-store-common</module>
<module>flink-table-store-connector</module>
<module>flink-table-store-core</module>
+ <module>flink-table-store-dist</module>
+ <module>flink-table-store-format</module>
<module>flink-table-store-kafka</module>
- <module>flink-table-store-orc</module>
</modules>
<properties>
@@ -486,7 +487,7 @@ under the License.
</includes>
<message>
Direct dependencies on flink-table-planner are not allowed.
- You should depend on either Table API modules, flink-table-planner-loader or flink-table-store-delegate-loader.
+ You should depend on either Table API modules, flink-table-planner-loader or flink-table-store-codegen-loader.
</message>
</bannedDependencies>
</rules>