You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/30 06:41:50 UTC
[flink-table-store] branch master updated: [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c8c3cd1d [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory
c8c3cd1d is described below
commit c8c3cd1d93012d6eba6a7f7639435d489e0cffd8
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jun 30 14:41:47 2022 +0800
[FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory
This closes #180
---
flink-table-store-common/pom.xml | 23 +++
.../flink/table/store/format}/FieldStats.java | 2 +-
.../table/store/format}/FieldStatsCollector.java | 16 +-
.../flink/table/store/format/FileFormat.java | 107 ++++++++++++
.../table/store}/format/FileFormatFactory.java | 2 +-
.../table/store/format}/FileStatsExtractor.java | 2 +-
.../utils/RowDataToObjectArrayConverter.java | 2 +-
.../store/format}/FileStatsExtractorTestBase.java | 7 +-
.../flink/table/store}/utils/RowDataUtilsTest.java | 0
flink-table-store-connector/pom.xml | 55 ++----
.../source/TestChangelogDataReadWrite.java | 8 +-
flink-table-store-core/pom.xml | 4 +-
.../flink/table/store/file/FileStoreOptions.java | 8 +-
.../table/store/file/data/AppendOnlyWriter.java | 2 +-
.../table/store/file/data/DataFileReader.java | 2 +-
.../table/store/file/data/DataFileWriter.java | 6 +-
.../flink/table/store/file/format/FileFormat.java | 184 ---------------------
.../table/store/file/format/FileFormatImpl.java | 57 -------
.../table/store/file/manifest/ManifestFile.java | 11 +-
.../table/store/file/manifest/ManifestList.java | 2 +-
.../file/operation/AbstractFileStoreScan.java | 2 +-
.../file/operation/AppendOnlyFileStoreRead.java | 2 +-
.../file/operation/AppendOnlyFileStoreWrite.java | 2 +-
.../store/file/operation/FileStoreCommitImpl.java | 2 +-
.../file/operation/KeyValueFileStoreRead.java | 2 +-
.../file/operation/KeyValueFileStoreWrite.java | 2 +-
.../flink/table/store/file/predicate/And.java | 2 +-
.../store/file/predicate/CompoundPredicate.java | 2 +-
.../flink/table/store/file/predicate/Equal.java | 2 +-
.../table/store/file/predicate/GreaterOrEqual.java | 2 +-
.../table/store/file/predicate/GreaterThan.java | 2 +-
.../table/store/file/predicate/IsNotNull.java | 2 +-
.../flink/table/store/file/predicate/IsNull.java | 2 +-
.../store/file/predicate/LeafBinaryFunction.java | 2 +-
.../table/store/file/predicate/LeafFunction.java | 2 +-
.../table/store/file/predicate/LeafPredicate.java | 2 +-
.../store/file/predicate/LeafUnaryFunction.java | 2 +-
.../table/store/file/predicate/LessOrEqual.java | 2 +-
.../flink/table/store/file/predicate/LessThan.java | 2 +-
.../flink/table/store/file/predicate/NotEqual.java | 2 +-
.../flink/table/store/file/predicate/Or.java | 2 +-
.../table/store/file/predicate/Predicate.java | 2 +-
.../table/store/file/predicate/StartsWith.java | 2 +-
.../table/store/file/stats/BinaryTableStats.java | 1 +
.../file/stats/FieldStatsArraySerializer.java | 3 +-
.../flink/table/store/file/writer/Metric.java | 2 +-
.../table/store/file/writer/MetricFileWriter.java | 8 +-
.../flink/table/store/file/FileFormatTest.java | 11 +-
.../store/file/data/AppendOnlyWriterTest.java | 8 +-
.../store/file/data/DataFileTestDataGenerator.java | 12 +-
.../store/file}/format/FileFormatSuffixTest.java | 20 +--
.../file/format/FileStatsExtractingAvroFormat.java | 27 ++-
.../FileStatsExtractingAvroFormatFactory.java | 2 +
.../store/file/format/FlushingFileFormat.java | 23 ++-
.../store/file/manifest/ManifestFileMetaTest.java | 6 +-
.../store/file/manifest/ManifestFileTest.java | 6 +-
.../store/file/manifest/ManifestListTest.java | 6 +-
.../file/manifest/ManifestTestDataGenerator.java | 7 +-
.../table/store/file/mergetree/MergeTreeTest.java | 2 +-
.../table/store/file/predicate/PredicateTest.java | 2 +-
.../store/file/stats/FieldStatsCollectorTest.java | 10 +-
.../table/store/file/stats/StatsTestUtils.java | 1 +
.../store/file/stats/TestFileStatsExtractor.java | 7 +-
...che.flink.table.store.format.FileFormatFactory} | 0
.../store/tests/FileStoreFlinkFormatE2eTest.java | 63 -------
flink-table-store-format/pom.xml | 30 ++--
.../table/store/format/avro/AvroFileFormat.java | 150 +++++++++++++++--
.../store/format/avro/AvroFileFormatFactory.java | 4 +-
.../table/store/format/orc/OrcFileFormat.java | 85 ++++++++--
.../store/format/orc/OrcFileFormatFactory.java | 6 +-
.../store/format/orc/OrcFileStatsExtractor.java | 4 +-
...che.flink.table.store.format.FileFormatFactory} | 0
.../table/store/format/BulkFileFormatTest.java | 16 +-
.../format/orc/OrcFileStatsExtractorTest.java | 7 +-
74 files changed, 531 insertions(+), 544 deletions(-)
diff --git a/flink-table-store-common/pom.xml b/flink-table-store-common/pom.xml
index c6f5cbbf..f9e8a905 100644
--- a/flink-table-store-common/pom.xml
+++ b/flink-table-store-common/pom.xml
@@ -38,5 +38,28 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
index 55a07424..d2ec5467 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStatsCollector.java
similarity index 85%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStatsCollector.java
index 5611a16f..e86fa406 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStatsCollector.java
@@ -16,12 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.RowType;
/** Collector to extract statistics of each fields from a series of records. */
@@ -32,7 +32,6 @@ public class FieldStatsCollector {
private final long[] nullCounts;
private final RowDataToObjectArrayConverter converter;
private final TypeSerializer<Object>[] fieldSerializers;
- private final FieldStatsArraySerializer fieldStatsArraySerializer;
public FieldStatsCollector(RowType rowType) {
int numFields = rowType.getFieldCount();
@@ -44,7 +43,6 @@ public class FieldStatsCollector {
for (int i = 0; i < numFields; i++) {
fieldSerializers[i] = InternalSerializers.create(rowType.getTypeAt(i));
}
- this.fieldStatsArraySerializer = new FieldStatsArraySerializer(rowType);
}
/**
@@ -76,15 +74,7 @@ public class FieldStatsCollector {
}
}
- public BinaryTableStats extract() {
- return toBinary(extractFieldStats());
- }
-
- public BinaryTableStats toBinary(FieldStats[] stats) {
- return fieldStatsArraySerializer.toBinary(stats);
- }
-
- public FieldStats[] extractFieldStats() {
+ public FieldStats[] extract() {
FieldStats[] stats = new FieldStats[nullCounts.length];
for (int i = 0; i < stats.length; i++) {
stats[i] = new FieldStats(minValues[i], maxValues[i], nullCounts[i]);
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
new file mode 100644
index 00000000..8bfcd2dd
--- /dev/null
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormat.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
+
+/**
+ * Factory class which creates reader and writer factories for specific file format.
+ *
+ * <p>NOTE: This class must be thread safe.
+ */
+public abstract class FileFormat {
+
+ protected String formatIdentifier;
+
+ protected FileFormat(String formatIdentifier) {
+ this.formatIdentifier = formatIdentifier;
+ }
+
+ public String getFormatIdentifier() {
+ return formatIdentifier;
+ }
+
+ /**
+ * Create a {@link BulkFormat} from the type, with projection pushed down.
+ *
+ * @param type Type without projection.
+ * @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
+ * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
+ */
+ public abstract BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, int[][] projection, List<ResolvedExpression> filters);
+
+ /** Create a {@link BulkWriter.Factory} from the type. */
+ public abstract BulkWriter.Factory<RowData> createWriterFactory(RowType type);
+
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
+ int[][] projection = new int[rowType.getFieldCount()][];
+ for (int i = 0; i < projection.length; i++) {
+ projection[i] = new int[] {i};
+ }
+ return createReaderFactory(rowType, projection);
+ }
+
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType rowType, int[][] projection) {
+ return createReaderFactory(rowType, projection, new ArrayList<>());
+ }
+
+ public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+ return Optional.empty();
+ }
+
+ /** Create a {@link FileFormat} from table options. */
+ public static FileFormat fromTableOptions(
+ Configuration tableOptions, ConfigOption<String> formatOption) {
+ String formatIdentifier = tableOptions.get(formatOption);
+ DelegatingConfiguration formatOptions =
+ new DelegatingConfiguration(tableOptions, formatIdentifier + ".");
+ return fromIdentifier(formatIdentifier, formatOptions);
+ }
+
+ /** Create a {@link FileFormat} from format identifier and format options. */
+ public static FileFormat fromIdentifier(String formatIdentifier, Configuration formatOptions) {
+ ServiceLoader<FileFormatFactory> serviceLoader =
+ ServiceLoader.load(FileFormatFactory.class);
+ for (FileFormatFactory factory : serviceLoader) {
+ if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
+ return factory.create(formatOptions);
+ }
+ }
+ throw new ValidationException(
+ String.format(
+ "Could not find any factories that implement '%s' in the classpath.",
+ FileFormatFactory.class.getName()));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormatFactory.java
similarity index 95%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormatFactory.java
index 088c11cd..f0ff0c01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileFormatFactory.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.format;
+package org.apache.flink.table.store.format;
import org.apache.flink.configuration.Configuration;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileStatsExtractor.java
similarity index 95%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileStatsExtractor.java
index 3f56d3b3..384a2737 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FileStatsExtractor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
import org.apache.flink.core.fs.Path;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
rename to flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
index 0def1c1c..ad708451 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.utils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
similarity index 97%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
rename to flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 42b06c67..90a53f7f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
+++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.stats;
+package org.apache.flink.table.store.format;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -29,7 +29,6 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
@@ -50,7 +49,7 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link FileStatsExtractor}. */
+/** Tests for {@link org.apache.flink.table.store.format.FileStatsExtractor}. */
public abstract class FileStatsExtractorTestBase {
@TempDir java.nio.file.Path tempDir;
@@ -76,7 +75,7 @@ public abstract class FileStatsExtractorTestBase {
for (GenericRowData row : data) {
collector.collect(row);
}
- FieldStats[] expected = collector.extractFieldStats();
+ FieldStats[] expected = collector.extract();
FileStatsExtractor extractor = format.createStatsExtractor(rowType).get();
assertThat(extractor).isNotNull();
diff --git a/flink-table-store-common/src/test/java/org.apache.flink.table.store/utils/RowDataUtilsTest.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
similarity index 100%
rename from flink-table-store-common/src/test/java/org.apache.flink.table.store/utils/RowDataUtilsTest.java
rename to flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
index dfde4ed4..6af82508 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -70,6 +70,21 @@ under the License.
<!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-format</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Tests: Hadoop required by ORC -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
@@ -135,13 +150,6 @@ under the License.
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
@@ -157,38 +165,6 @@ under the License.
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-orc</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Tests: Hadoop required by ORC -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs-client</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -208,6 +184,7 @@ under the License.
</exclusions>
<scope>test</scope>
</dependency>
+
<dependency>
<!-- include 2.0 server for tests -->
<groupId>org.apache.kafka</groupId>
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index feba9da2..a99d05c0 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
@@ -39,6 +38,7 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
@@ -74,11 +74,7 @@ public class TestChangelogDataReadWrite {
private final ExecutorService service;
public TestChangelogDataReadWrite(String root, ExecutorService service) {
- this.avro =
- FileFormat.fromIdentifier(
- Thread.currentThread().getContextClassLoader(),
- "avro",
- new Configuration());
+ this.avro = FileFormat.fromIdentifier("avro", new Configuration());
this.tablePath = new Path(root);
this.pathFactory =
new FileStorePathFactory(
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index b880507d..fbabafff 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -85,8 +85,8 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${flink.version}</version>
+ <artifactId>flink-table-store-format</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 0d01be5d..43065064 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -27,8 +27,8 @@ import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
@@ -219,13 +219,11 @@ public class FileStoreOptions implements Serializable {
}
public FileFormat fileFormat() {
- return FileFormat.fromTableOptions(
- Thread.currentThread().getContextClassLoader(), options, FILE_FORMAT);
+ return FileFormat.fromTableOptions(options, FILE_FORMAT);
}
public FileFormat manifestFormat() {
- return FileFormat.fromTableOptions(
- Thread.currentThread().getContextClassLoader(), options, MANIFEST_FORMAT);
+ return FileFormat.fromTableOptions(options, MANIFEST_FORMAT);
}
public MemorySize manifestTargetSize() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 8451064d..8a7105e7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.data;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.stats.BinaryTableStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -32,6 +31,7 @@ import org.apache.flink.table.store.file.writer.Metric;
import org.apache.flink.table.store.file.writer.MetricFileWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index 098cf430..45344989 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -27,11 +27,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
index 91ac96d9..4f348cc9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
@@ -26,11 +26,8 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.stats.BinaryTableStats;
-import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.writer.BaseFileWriter;
@@ -38,6 +35,9 @@ import org.apache.flink.table.store.file.writer.FileWriter;
import org.apache.flink.table.store.file.writer.Metric;
import org.apache.flink.table.store.file.writer.MetricFileWriter;
import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
deleted file mode 100644
index ce7a698f..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.format;
-
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.ServiceLoader;
-
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-
-/**
- * Factory class which creates reader and writer factories for specific file format.
- *
- * <p>NOTE: This class must be thread safe.
- */
-public abstract class FileFormat {
-
- protected String formatIdentifier;
-
- protected FileFormat(String formatIdentifier) {
- this.formatIdentifier = formatIdentifier;
- }
-
- protected abstract BulkDecodingFormat<RowData> getDecodingFormat();
-
- protected abstract EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat();
-
- public String getFormatIdentifier() {
- return formatIdentifier;
- }
-
- /**
- * Create a {@link BulkFormat} from the type, with projection pushed down.
- *
- * @param type Type without projection.
- * @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
- * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
- */
- @SuppressWarnings("unchecked")
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, int[][] projection, List<ResolvedExpression> filters) {
- BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
- // TODO use ProjectingBulkFormat if not supported
- Preconditions.checkState(
- decodingFormat instanceof ProjectableDecodingFormat,
- "Format "
- + decodingFormat.getClass().getName()
- + " does not support projection push down");
- decodingFormat.applyFilters(filters);
- return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
- .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(type), projection);
- }
-
- /** Create a {@link BulkWriter.Factory} from the type. */
- public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
- return getEncodingFormat().createRuntimeEncoder(SINK_CONTEXT, fromLogicalToDataType(type));
- }
-
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
- int[][] projection = new int[rowType.getFieldCount()][];
- for (int i = 0; i < projection.length; i++) {
- projection[i] = new int[] {i};
- }
- return createReaderFactory(rowType, projection);
- }
-
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType rowType, int[][] projection) {
- return createReaderFactory(rowType, projection, new ArrayList<>());
- }
-
- public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
- return Optional.empty();
- }
-
- /** Create a {@link FileFormatImpl} from table options. */
- public static FileFormat fromTableOptions(
- ClassLoader classLoader,
- Configuration tableOptions,
- ConfigOption<String> formatOption) {
- String formatIdentifier = tableOptions.get(formatOption);
- DelegatingConfiguration formatOptions =
- new DelegatingConfiguration(tableOptions, formatIdentifier + ".");
- return fromIdentifier(classLoader, formatIdentifier, formatOptions);
- }
-
- /** Create a {@link FileFormatImpl} from format identifier and format options. */
- public static FileFormat fromIdentifier(
- ClassLoader classLoader, String formatIdentifier, Configuration formatOptions) {
- ServiceLoader<FileFormatFactory> serviceLoader =
- ServiceLoader.load(FileFormatFactory.class);
- for (FileFormatFactory factory : serviceLoader) {
- if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
- return factory.create(formatOptions);
- }
- }
- return new FileFormatImpl(classLoader, formatIdentifier, formatOptions);
- }
-
- private static final DynamicTableSink.Context SINK_CONTEXT =
- new DynamicTableSink.Context() {
-
- @Override
- public boolean isBounded() {
- return false;
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
- return InternalTypeInfo.of(consumedDataType.getLogicalType());
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- LogicalType consumedLogicalType) {
- return InternalTypeInfo.of(consumedLogicalType);
- }
-
- @Override
- public DynamicTableSink.DataStructureConverter createDataStructureConverter(
- DataType consumedDataType) {
- throw new UnsupportedOperationException();
- }
- };
-
- private static final DynamicTableSource.Context SOURCE_CONTEXT =
- new DynamicTableSource.Context() {
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(DataType consumedDataType) {
- return InternalTypeInfo.of(consumedDataType.getLogicalType());
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- LogicalType producedLogicalType) {
- return InternalTypeInfo.of(producedLogicalType);
- }
-
- @Override
- public DynamicTableSource.DataStructureConverter createDataStructureConverter(
- DataType consumedDataType) {
- throw new UnsupportedOperationException();
- }
- };
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
deleted file mode 100644
index ae2e27a0..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.format;
-
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
-import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
-
-/** A {@link FileFormat} which discovers reader and writer from format identifier. */
-public class FileFormatImpl extends FileFormat {
-
- private final BulkReaderFormatFactory readerFactory;
- private final BulkWriterFormatFactory writerFactory;
- private final ReadableConfig formatOptions;
-
- public FileFormatImpl(
- ClassLoader classLoader, String formatIdentifier, ReadableConfig formatOptions) {
- super(formatIdentifier);
- this.readerFactory =
- FactoryUtil.discoverFactory(
- classLoader, BulkReaderFormatFactory.class, formatIdentifier);
- this.writerFactory =
- FactoryUtil.discoverFactory(
- classLoader, BulkWriterFormatFactory.class, formatIdentifier);
- this.formatOptions = formatOptions;
- }
-
- protected BulkDecodingFormat<RowData> getDecodingFormat() {
- return readerFactory.createDecodingFormat(null, formatOptions); // context is useless
- }
-
- @Override
- protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
- return writerFactory.createEncodingFormat(null, formatOptions); // context is useless
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index f77c697b..3e3c4f35 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -24,10 +24,8 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
@@ -36,6 +34,9 @@ import org.apache.flink.table.store.file.writer.FileWriter;
import org.apache.flink.table.store.file.writer.Metric;
import org.apache.flink.table.store.file.writer.MetricFileWriter;
import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.format.FieldStatsCollector;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
@@ -153,6 +154,7 @@ public class ManifestFile {
private class ManifestEntryWriter extends BaseFileWriter<ManifestEntry, ManifestFileMeta> {
private final FieldStatsCollector partitionStatsCollector;
+ private final FieldStatsArraySerializer partitionStatsSerializer;
private long numAddedFiles = 0;
private long numDeletedFiles = 0;
@@ -161,6 +163,7 @@ public class ManifestFile {
super(writerFactory, path);
this.partitionStatsCollector = new FieldStatsCollector(partitionType);
+ this.partitionStatsSerializer = new FieldStatsArraySerializer(partitionType);
}
@Override
@@ -190,7 +193,7 @@ public class ManifestFile {
path.getFileSystem().getFileStatus(path).getLen(),
numAddedFiles,
numDeletedFiles,
- partitionStatsCollector.extract(),
+ partitionStatsSerializer.toBinary(partitionStatsCollector.extract()),
schemaId);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 8d3cc827..41a25b98 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -25,10 +25,10 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 3fbb2653..dbec3d80 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -31,8 +31,8 @@ import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 346cac87..20b2805e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.data.AppendOnlyReader;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 29cf3177..7d8cc440 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -23,11 +23,11 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.AppendOnlyWriter;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index f6a1404e..58b5739f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -34,8 +34,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateConverter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.MetaFileWriter;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index f9eb3b2a..8deb78f3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
@@ -31,6 +30,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index c546a5d7..0063c752 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
@@ -42,6 +41,7 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
index 313ffe77..dc256767 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import java.util.ArrayList;
import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
index 4be05b2a..d823da0d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import java.io.Serializable;
import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
index cf9a0f99..a59736ab 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
index e93857e1..4a59066d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
index 3e771ae0..7d517460 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index 093ad663..1095112a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index 5aa48423..fde5197d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
index 0b91cb70..3f2b6c73 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafBinaryFunction.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
index 97bfcebd..00c852cc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.io.Serializable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 2e8d6c81..17a52f76 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
index aa414c68..72dcfbb1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
index 784fef38..72382a53 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
index 5ef21753..46c02a84 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
index 71379516..2d3002ca 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
index 5a74bd6a..9ba91286 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import java.util.ArrayList;
import java.util.List;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
index 7e7c1faa..3f6748d8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.predicate;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import java.io.Serializable;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
index cf1b3db2..10b5faf6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/StartsWith.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.data.binary.BinaryStringData;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Optional;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
index 5549358c..9651bcb9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.format.FieldStats;
import javax.annotation.Nullable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 5a5a48a0..4ae0ce3c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -20,7 +20,8 @@ package org.apache.flink.table.store.file.stats;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
index f80381d0..b41545c3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.file.writer;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
/** Metric information to describe the column's max-min values, record count etc. */
public class Metric {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
index 676e096a..26e26953 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
@@ -24,10 +24,10 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FieldStatsCollector;
+import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
@@ -127,7 +127,7 @@ public class MetricFileWriter<T> implements FileWriter<T, Metric> {
if (fileStatsExtractor != null) {
stats = fileStatsExtractor.extract(path);
} else {
- stats = fieldStatsCollector.extractFieldStats();
+ stats = fieldStatsCollector.extract();
}
return new Metric(stats, recordCount, length);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
index ce08143b..e46fd09f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
@@ -28,12 +28,10 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.format.FileFormatImpl;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.avro.AvroRuntimeException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -44,7 +42,7 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link FileFormatImpl}. */
+/** Test for {@link FileFormat}. */
public class FileFormatTest {
@Test
@@ -89,7 +87,7 @@ public class FileFormatTest {
createFileFormat("_unsupported").createWriterFactory(RowType.of(new IntType()));
Path path = new Path(tempDir.toUri().toString(), "1.avro");
Assertions.assertThrows(
- AvroRuntimeException.class,
+ RuntimeException.class,
() ->
writerFactory.create(
path.getFileSystem()
@@ -101,7 +99,6 @@ public class FileFormatTest {
Configuration tableOptions = new Configuration();
tableOptions.set(FileStoreOptions.FILE_FORMAT, "avro");
tableOptions.setString("avro.codec", codec);
- return FileFormat.fromTableOptions(
- this.getClass().getClassLoader(), tableOptions, FileStoreOptions.FILE_FORMAT);
+ return FileFormat.fromTableOptions(tableOptions, FileStoreOptions.FILE_FORMAT);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index 774676ad..5cfb2f73 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -26,11 +26,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -220,9 +220,7 @@ public class AppendOnlyWriterTest {
private RecordWriter<RowData> createWriter(
long targetFileSize, RowType writeSchema, long maxSeqNum) {
- FileFormat fileFormat =
- FileFormat.fromIdentifier(
- Thread.currentThread().getContextClassLoader(), AVRO, new Configuration());
+ FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Configuration());
return new AppendOnlyWriter(
0, fileFormat, targetFileSize, writeSchema, maxSeqNum, pathFactory);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
index 86345713..b5369dca 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
@@ -21,7 +21,8 @@ package org.apache.flink.table.store.file.data;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.format.FieldStatsCollector;
import java.util.ArrayList;
import java.util.HashMap;
@@ -103,6 +104,11 @@ public class DataFileTestDataGenerator {
new FieldStatsCollector(TestKeyValueGenerator.KEY_TYPE);
FieldStatsCollector valueStatsCollector =
new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
+ FieldStatsArraySerializer keyStatsSerializer =
+ new FieldStatsArraySerializer(TestKeyValueGenerator.KEY_TYPE);
+ FieldStatsArraySerializer valueStatsSerializer =
+ new FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
+
long totalSize = 0;
BinaryRowData minKey = null;
BinaryRowData maxKey = null;
@@ -133,8 +139,8 @@ public class DataFileTestDataGenerator {
kvs.size(),
minKey,
maxKey,
- keyStatsCollector.extract(),
- valueStatsCollector.extract(),
+ keyStatsSerializer.toBinary(keyStatsCollector.extract()),
+ valueStatsSerializer.toBinary(valueStatsCollector.extract()),
minSequenceNumber,
maxSequenceNumber,
0,
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
similarity index 81%
rename from flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 75ba824c..2288c931 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.format;
+package org.apache.flink.table.store.file.format;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
@@ -27,16 +27,15 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.data.DataFileTest;
import org.apache.flink.table.store.file.data.DataFileWriter;
-import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
@@ -48,21 +47,16 @@ public class FileFormatSuffixTest extends DataFileTest {
new LogicalType[] {new IntType(), new VarCharType(), new VarCharType()},
new String[] {"id", "name", "dt"});
- @ParameterizedTest
- @ValueSource(strings = {"avro", "parquet", "orc"})
- public void testFileSuffix(String format, @TempDir java.nio.file.Path tempDir)
- throws Exception {
+ @Test
+ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception {
+ String format = "avro";
DataFileWriter dataFileWriter = createDataFileWriter(tempDir.toString(), format);
Path path = dataFileWriter.pathFactory().newPath();
Assertions.assertTrue(path.getPath().endsWith(format));
DataFilePathFactory dataFilePathFactory =
new DataFilePathFactory(new Path(tempDir.toString()), "dt=1", 1, format);
- FileFormat fileFormat =
- FileFormat.fromIdentifier(
- Thread.currentThread().getContextClassLoader(),
- format,
- new Configuration());
+ FileFormat fileFormat = FileFormat.fromIdentifier(format, new Configuration());
AppendOnlyWriter appendOnlyWriter =
new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10, dataFilePathFactory);
appendOnlyWriter.write(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index 0bbec2e3..29ebd0c3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -18,18 +18,39 @@
package org.apache.flink.table.store.file.format;
+import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
+import java.util.List;
import java.util.Optional;
/** An avro {@link FileFormat} for test. It provides a {@link FileStatsExtractor}. */
-public class FileStatsExtractingAvroFormat extends FileFormatImpl {
+public class FileStatsExtractingAvroFormat extends FileFormat {
+
+ private final FileFormat avro;
public FileStatsExtractingAvroFormat() {
- super(FileStatsExtractingAvroFormat.class.getClassLoader(), "avro", new Configuration());
+ super("avro");
+ avro = FileFormat.fromIdentifier("avro", new Configuration());
+ }
+
+ @Override
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ return avro.createReaderFactory(type, projection, filters);
+ }
+
+ @Override
+ public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+ return avro.createWriterFactory(type);
}
@Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
index 9c841766..ee89402a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.store.file.format;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatFactory;
/** Factory to create {@link FileStatsExtractingAvroFormat}. */
public class FileStatsExtractingAvroFormatFactory implements FileFormatFactory {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index 874bec22..23569b21 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -20,12 +20,15 @@ package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
+import java.util.List;
/** A special {@link FileFormat} which flushes for every added element. */
public class FlushingFileFormat extends FileFormat {
@@ -34,26 +37,20 @@ public class FlushingFileFormat extends FileFormat {
public FlushingFileFormat(String identifier) {
super(identifier);
- this.format =
- FileFormat.fromIdentifier(
- FlushingFileFormat.class.getClassLoader(), identifier, new Configuration());
+ this.format = FileFormat.fromIdentifier(identifier, new Configuration());
}
@Override
- protected BulkDecodingFormat<RowData> getDecodingFormat() {
- return format.getDecodingFormat();
- }
-
- @Override
- protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
- return format.getEncodingFormat();
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ return format.createReaderFactory(type, projection, filters);
}
@Override
public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
return fsDataOutputStream -> {
BulkWriter<RowData> wrapped =
- super.createWriterFactory(type).create(fsDataOutputStream);
+ format.createWriterFactory(type).create(fsDataOutputStream);
return new BulkWriter<RowData>() {
@Override
public void addElement(RowData rowData) throws IOException {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index cf89de9b..43811b7a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -61,9 +61,7 @@ public class ManifestFileMetaTest {
private ManifestFile manifestFile;
public ManifestFileMetaTest() {
- this.avro =
- FileFormat.fromIdentifier(
- ManifestFileMetaTest.class.getClassLoader(), "avro", new Configuration());
+ this.avro = FileFormat.fromIdentifier("avro", new Configuration());
}
@BeforeEach
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 1f6cf287..3ad75724 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -22,11 +22,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
@@ -44,9 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ManifestFileTest {
private final ManifestTestDataGenerator gen = ManifestTestDataGenerator.builder().build();
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- ManifestFileTest.class.getClassLoader(), "avro", new Configuration());
+ private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
@TempDir java.nio.file.Path tempDir;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 38a4d68d..6064c1cf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -23,9 +23,9 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
@@ -41,9 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ManifestListTest {
private final ManifestTestDataGenerator gen = ManifestTestDataGenerator.builder().build();
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- ManifestListTest.class.getClassLoader(), "avro", new Configuration());
+ private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
@TempDir java.nio.file.Path tempDir;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index 2eff6284..a2cfa068 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -22,7 +22,8 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.data.DataFileTestDataGenerator;
-import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.format.FieldStatsCollector;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -83,6 +84,8 @@ public class ManifestTestDataGenerator {
FieldStatsCollector collector =
new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+ FieldStatsArraySerializer serializer =
+ new FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
long numAddedFiles = 0;
long numDeletedFiles = 0;
@@ -100,7 +103,7 @@ public class ManifestTestDataGenerator {
entries.size() * 100L,
numAddedFiles,
numDeletedFiles,
- collector.extract(),
+ serializer.toBinary(collector.extract()),
0);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 69ca3a6f..96f4554e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.format.FlushingFileFormat;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -43,6 +42,7 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 3d396ee5..45617789 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
index c3d54cf8..62bf433d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.file.stats;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FieldStatsCollector;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -44,7 +46,7 @@ public class FieldStatsCollectorTest {
1,
StringData.fromString("Flink"),
new GenericArrayData(new int[] {1, 10})));
- assertThat(collector.extractFieldStats())
+ assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
new FieldStats(1, 1, 0),
@@ -56,7 +58,7 @@ public class FieldStatsCollectorTest {
});
collector.collect(GenericRowData.of(3, null, new GenericArrayData(new int[] {3, 30})));
- assertThat(collector.extractFieldStats())
+ assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
new FieldStats(1, 3, 0),
@@ -72,7 +74,7 @@ public class FieldStatsCollectorTest {
null,
StringData.fromString("Apache"),
new GenericArrayData(new int[] {2, 20})));
- assertThat(collector.extractFieldStats())
+ assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
new FieldStats(1, 3, 1),
@@ -84,7 +86,7 @@ public class FieldStatsCollectorTest {
});
collector.collect(GenericRowData.of(2, StringData.fromString("Batch"), null));
- assertThat(collector.extractFieldStats())
+ assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
new FieldStats(1, 3, 1),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
index 1bcc955e..e099e9ec 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.stats;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
index 564855d9..4d854aef 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
@@ -22,9 +22,12 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FieldStatsCollector;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
@@ -53,7 +56,7 @@ public class TestFileStatsExtractor implements FileStatsExtractor {
for (RowData record : records) {
statsCollector.collect(record);
}
- return statsCollector.extractFieldStats();
+ return statsCollector.extract();
}
private static class IdentityObjectSerializer extends ObjectSerializer<RowData> {
diff --git a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
similarity index 100%
rename from flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
rename to flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
deleted file mode 100644
index 0ceee859..00000000
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.tests;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.UUID;
-
-/**
- * Test that file store supports format not included in flink-table-store-format but provided by
- * Flink itself.
- */
-public class FileStoreFlinkFormatE2eTest extends E2eTestBase {
-
- @Test
- public void testCsv() throws Exception {
- String tableStoreDdl =
- "CREATE TABLE IF NOT EXISTS table_store (\n"
- + " a INT,\n"
- + " b VARCHAR\n"
- + ") WITH (\n"
- + " 'bucket' = '3',\n"
- + " 'root-path' = '%s',\n"
- + " 'file.format' = 'csv'\n"
- + ");";
- tableStoreDdl =
- String.format(
- tableStoreDdl,
- TEST_DATA_DIR + "/" + UUID.randomUUID().toString() + ".store");
-
- runSql("INSERT INTO table_store VALUES (1, 'Hi'), (2, 'Hello');", tableStoreDdl);
- runSql(
- "INSERT INTO result1 SELECT * FROM table_store;",
- tableStoreDdl,
- createResultSink("result1", "a INT, b VARCHAR"));
- checkResult("1, Hi", "2, Hello");
- }
-
- private void runSql(String sql, String... ddls) throws Exception {
- runSql(
- "SET 'execution.runtime-mode' = 'batch';\n"
- + "SET 'table.dml-sync' = 'true';\n"
- + String.join("\n", ddls)
- + "\n"
- + sql);
- }
-}
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index 1b582ec9..80fb0611 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -34,7 +34,7 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-core</artifactId>
+ <artifactId>flink-table-store-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
@@ -43,23 +43,30 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
+ <artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
+ <artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <!-- format dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
@@ -80,7 +87,6 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc</artifactId>
<version>${flink.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
@@ -101,7 +107,6 @@ under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
- <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
@@ -122,16 +127,9 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-core</artifactId>
+ <artifactId>flink-table-store-common</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
<type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-runtime</artifactId>
- <version>${flink.version}</version>
<scope>test</scope>
</dependency>
@@ -238,6 +236,10 @@ under the License.
</filter>
</filters>
<relocations>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
+ </relocation>
<relocation>
<pattern>org.apache.flink.formats.avro</pattern>
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.avro</shadedPattern>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index 9afe772e..4a778a63 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -19,31 +19,161 @@
package org.apache.flink.table.store.format.avro;
import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.avro.AbstractAvroBulkFormat;
+import org.apache.flink.formats.avro.AvroBuilder;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.AvroWriterFactory;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
-/** Avro {@link FileFormat}. */
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
+
+/** Avro {@link FileFormat}. The main code is copied from Flink {@code AvroFileFormatFactory}. */
public class AvroFileFormat extends FileFormat {
- private final org.apache.flink.formats.avro.AvroFileFormatFactory factory;
private final ReadableConfig formatOptions;
public AvroFileFormat(ReadableConfig formatOptions) {
super(org.apache.flink.formats.avro.AvroFileFormatFactory.IDENTIFIER);
- this.factory = new org.apache.flink.formats.avro.AvroFileFormatFactory();
this.formatOptions = formatOptions;
}
@Override
- protected BulkDecodingFormat<RowData> getDecodingFormat() {
- return factory.createDecodingFormat(null, formatOptions); // context is useless
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ // avro is a file format that keeps schemas in file headers,
+ // if the schema given to the reader is not equal to the schema in header,
+ // reader will automatically map the fields and give back records with our desired
+ // schema
+ //
+ // for detailed discussion see comments in https://github.com/apache/flink/pull/18657
+ LogicalType producedType = Projection.of(projection).project(type);
+ return new AvroGenericRecordBulkFormat(
+ (RowType) producedType.copy(false), InternalTypeInfo.of(producedType));
}
@Override
- protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
- return factory.createEncodingFormat(null, formatOptions); // context is useless
+ public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+ return new RowDataAvroWriterFactory(type, formatOptions.get(AVRO_OUTPUT_CODEC));
+ }
+
+ private static class AvroGenericRecordBulkFormat
+ extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RowType producedRowType;
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ public AvroGenericRecordBulkFormat(
+ RowType producedRowType, TypeInformation<RowData> producedTypeInfo) {
+ super(AvroSchemaConverter.convertToSchema(producedRowType));
+ this.producedRowType = producedRowType;
+ this.producedTypeInfo = producedTypeInfo;
+ }
+
+ @Override
+ protected GenericRecord createReusedAvroRecord() {
+ return new GenericData.Record(readerSchema);
+ }
+
+ @Override
+ protected Function<GenericRecord, RowData> createConverter() {
+ AvroToRowDataConverters.AvroToRowDataConverter converter =
+ AvroToRowDataConverters.createRowConverter(producedRowType);
+ return record -> record == null ? null : (GenericRowData) converter.convert(record);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+ }
+
+ /**
+ * A {@link BulkWriter.Factory} to convert {@link RowData} to {@link GenericRecord} and wrap
+ * {@link AvroWriterFactory}.
+ */
+ private static class RowDataAvroWriterFactory implements BulkWriter.Factory<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AvroWriterFactory<GenericRecord> factory;
+ private final RowType rowType;
+
+ private RowDataAvroWriterFactory(RowType rowType, String codec) {
+ this.rowType = rowType;
+ this.factory =
+ new AvroWriterFactory<>(
+ new AvroBuilder<GenericRecord>() {
+ @Override
+ public DataFileWriter<GenericRecord> createWriter(OutputStream out)
+ throws IOException {
+ Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+ DatumWriter<GenericRecord> datumWriter =
+ new GenericDatumWriter<>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter =
+ new DataFileWriter<>(datumWriter);
+
+ if (codec != null) {
+ dataFileWriter.setCodec(CodecFactory.fromString(codec));
+ }
+ dataFileWriter.create(schema, out);
+ return dataFileWriter;
+ }
+ });
+ }
+
+ @Override
+ public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
+ BulkWriter<GenericRecord> writer = factory.create(out);
+ RowDataToAvroConverters.RowDataToAvroConverter converter =
+ RowDataToAvroConverters.createConverter(rowType);
+ Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+ return new BulkWriter<RowData>() {
+
+ @Override
+ public void addElement(RowData element) throws IOException {
+ GenericRecord record = (GenericRecord) converter.convert(schema, element);
+ writer.addElement(record);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writer.flush();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ writer.finish();
+ }
+ };
+ }
}
}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
index 26c5ad56..c541eaa7 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormatFactory.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.store.format.avro;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.format.FileFormatFactory;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileFormatFactory;
/** Factory to create {@link AvroFileFormat}. */
public class AvroFileFormatFactory implements FileFormatFactory {
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index c9066946..b5fd5c5e 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -21,24 +21,43 @@ package org.apache.flink.table.store.format.orc;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.orc.OrcColumnarRowInputFormat;
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.OrcSplitReaderUtil;
+import org.apache.flink.orc.shim.OrcShim;
+import org.apache.flink.orc.vector.RowDataVectorizer;
+import org.apache.flink.orc.writer.OrcBulkWriterFactory;
+import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.orc.TypeDescription;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
-/** Orc {@link FileFormat}. */
+/** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */
public class OrcFileFormat extends FileFormat {
- private final org.apache.flink.orc.OrcFileFormatFactory factory;
private final Configuration formatOptions;
public OrcFileFormat(Configuration formatOptions) {
super(org.apache.flink.orc.OrcFileFormatFactory.IDENTIFIER);
- this.factory = new org.apache.flink.orc.OrcFileFormatFactory();
this.formatOptions = formatOptions;
}
@@ -48,17 +67,57 @@ public class OrcFileFormat extends FileFormat {
}
@Override
- protected BulkDecodingFormat<RowData> getDecodingFormat() {
- return factory.createDecodingFormat(null, formatOptions); // context is useless
+ public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+ return Optional.of(new OrcFileStatsExtractor(type));
}
@Override
- protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
- return factory.createEncodingFormat(null, formatOptions); // context is useless
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
+
+ if (filters != null) {
+ for (Expression pred : filters) {
+ OrcFilters.Predicate orcPred = OrcFilters.toOrcPredicate(pred);
+ if (orcPred != null) {
+ orcPredicates.add(orcPred);
+ }
+ }
+ }
+
+ Properties properties = getOrcProperties(formatOptions);
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ properties.forEach((k, v) -> conf.set(k.toString(), v.toString()));
+
+ return OrcColumnarRowInputFormat.createPartitionedFormat(
+ OrcShim.defaultShim(),
+ conf,
+ type,
+ Collections.emptyList(),
+ null,
+ Projection.of(projection).toTopLevelIndexes(),
+ orcPredicates,
+ VectorizedColumnBatch.DEFAULT_SIZE,
+ InternalTypeInfo::of);
}
@Override
- public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
- return Optional.of(new OrcFileStatsExtractor(type));
+ public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+ LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
+
+ TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(type);
+
+ return new OrcBulkWriterFactory<>(
+ new RowDataVectorizer(typeDescription.toString(), orcTypes),
+ getOrcProperties(formatOptions),
+ new org.apache.hadoop.conf.Configuration());
+ }
+
+ private static Properties getOrcProperties(ReadableConfig options) {
+ Properties orcProperties = new Properties();
+ Properties properties = new Properties();
+ ((org.apache.flink.configuration.Configuration) options).addAllToProperties(properties);
+ properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v));
+ return orcProperties;
}
}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
index 9e544e49..d580ce61 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormatFactory.java
@@ -19,16 +19,18 @@
package org.apache.flink.table.store.format.orc;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.format.FileFormatFactory;
+import org.apache.flink.table.store.format.FileFormatFactory;
import java.util.Properties;
/** Factory to create {@link OrcFileFormat}. */
public class OrcFileFormatFactory implements FileFormatFactory {
+ public static final String IDENTIFIER = "orc";
+
@Override
public String identifier() {
- return "orc";
+ return IDENTIFIER;
}
@Override
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
index 1acd68d6..044a26e1 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
@@ -22,8 +22,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.DateTimeUtils;
diff --git a/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
similarity index 100%
rename from flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
rename to flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java
index 91e70b81..eefcec76 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/BulkFileFormatTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -55,22 +53,10 @@ public class BulkFileFormatTest {
testFormatWriteRead(tempDir, "orc", "snappy");
}
- @Test
- public void testParquet(@TempDir java.nio.file.Path tempDir) throws IOException {
- testFormatWriteRead(tempDir, "parquet", "snappy");
- }
-
- @Test
- public void testCsv(@TempDir java.nio.file.Path tempDir) throws IOException {
- testFormatWriteRead(tempDir, "csv", "snappy");
- }
-
public FileFormat createFileFormat(String format, String codec) {
Configuration tableOptions = new Configuration();
- tableOptions.set(FileStoreOptions.FILE_FORMAT, format);
tableOptions.setString(format + ".codec", codec);
- return FileFormat.fromTableOptions(
- this.getClass().getClassLoader(), tableOptions, FileStoreOptions.FILE_FORMAT);
+ return FileFormat.fromIdentifier(format, tableOptions);
}
public void testFormatWriteRead(
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 1ee12ccc..eaac4b29 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.store.format.orc;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.stats.FileStatsExtractorTestBase;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
@@ -41,8 +41,7 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
@Override
protected FileFormat createFormat() {
- return FileFormat.fromIdentifier(
- OrcFileStatsExtractorTest.class.getClassLoader(), "orc", new Configuration());
+ return FileFormat.fromIdentifier("orc", new Configuration());
}
@Override