You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/05 13:10:00 UTC
[flink-table-store] branch master updated: [FLINK-27207] Support built-in parquet format
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 70d93d29 [FLINK-27207] Support built-in parquet format
70d93d29 is described below
commit 70d93d29876837d99dd80f422bb39e54fb8b8094
Author: Yubin Li <li...@163.com>
AuthorDate: Mon Sep 5 21:09:54 2022 +0800
[FLINK-27207] Support built-in parquet format
This closes #134
---
.../flink/table/store/utils/DateTimeUtils.java | 5 +
.../store/tests/FileStoreBuiltInFormatE2eTest.java | 120 ++++++++++++
flink-table-store-format/pom.xml | 31 +++-
.../store/format/parquet/ParquetFileFormat.java | 83 +++++++++
.../format/parquet/ParquetFileFormatFactory.java | 56 ++++++
.../format/parquet/ParquetFileStatsExtractor.java | 204 +++++++++++++++++++++
.../format/parquet/ParquetInputFormatFactory.java | 103 +++++++++++
.../table/store/format/parquet/ParquetUtil.java | 95 ++++++++++
...ache.flink.table.store.format.FileFormatFactory | 3 +-
.../format/parquet/ParquetFileFormatTest.java | 74 ++++++++
.../parquet/ParquetFileStatsExtractorTest.java | 61 ++++++
pom.xml | 2 +
12 files changed, 835 insertions(+), 2 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java
index 10905468..f1814613 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.utils;
import org.apache.flink.table.data.TimestampData;
import java.time.DateTimeException;
+import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
@@ -92,6 +93,10 @@ public class DateTimeUtils {
return (int) (ts / MILLIS_PER_DAY);
}
+ public static int toInternal(LocalDate date) {
+ return ymdToUnixDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth());
+ }
+
public static Integer parseDate(String s) {
// allow timestamp str to date, e.g. 2017-12-12 09:30:00.0
int ws1 = s.indexOf(" ");
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBuiltInFormatE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBuiltInFormatE2eTest.java
new file mode 100644
index 00000000..cfeab630
--- /dev/null
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBuiltInFormatE2eTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.tests;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+
+/** Test that file store supports format included in flink-table-store-format. */
+public class FileStoreBuiltInFormatE2eTest extends E2eTestBase {
+
+ @Test
+ public void testParquet() throws Exception {
+ String schema =
+ "id INT,\n"
+ + "isMan BOOLEAN,\n"
+ + "houseNum TINYINT,\n"
+ + "debugNum SMALLINT,\n"
+ + "age INT,\n"
+ + "cash BIGINT,\n"
+ + "money FLOAT,\n"
+ + "f1 DOUBLE,\n"
+ + "f2 DECIMAL(5, 3),\n"
+ + "f3 DECIMAL(26, 8),\n"
+ + "f4 CHAR(10),\n"
+ + "f5 VARCHAR(10),\n"
+ + "f6 STRING,\n"
+ + "f7 DATE\n";
+ String tableStoreDdl =
+ "CREATE TABLE IF NOT EXISTS table_store (\n"
+ + schema
+ + ") WITH (\n"
+ + " 'bucket' = '3',\n"
+ + " 'root-path' = '%s',\n"
+ + " 'file.format' = 'parquet'\n"
+ + ");";
+ tableStoreDdl =
+ String.format(
+ tableStoreDdl,
+ TEST_DATA_DIR + "/" + UUID.randomUUID().toString() + ".store");
+ String insertDml =
+ "INSERT INTO table_store VALUES ("
+ + "1,"
+ + "true,"
+ + "cast(1 as tinyint),"
+ + "cast(10 as smallint),"
+ + "cast(100 as int),"
+ + "cast(999999 as bigint),"
+ + "cast(1.1 as float),"
+ + "1.11,"
+ + "12.456,"
+ + "cast('123456789123456789.12345678' as decimal(26, 8)),"
+ + "cast('hi' as char(10)),"
+ + "'Parquet',"
+ + "'这是一个parquet format',"
+ + "DATE '2022-05-21'"
+ + "),("
+ + "2,"
+ + "false,"
+ + "cast(2 as tinyint),"
+ + "cast(29 as smallint),"
+ + "cast(200 as int),"
+ + "cast(9999999 as bigint),"
+ + "cast(2.2 as float),"
+ + "2.22,"
+ + "22.557,"
+ + "cast('222222789123456789.12345678' as decimal(26, 8)),"
+ + "cast('hello' as char(10)),"
+ + "'Hi Yu bin',"
+ + "'这是一个 built in parquet format',"
+ + "DATE '2022-05-23'"
+ + ")";
+ String resultDdl = createResultSink("result1", schema);
+ runSql(insertDml, tableStoreDdl);
+ runSql(
+ "INSERT INTO result1 SELECT * FROM table_store where id > 1;",
+ tableStoreDdl,
+ resultDdl);
+ checkResult(
+ "2, "
+ + "false, "
+ + "2, "
+ + "29, "
+ + "200, "
+ + "9999999, "
+ + "2.2, "
+ + "2.22, "
+ + "22.557, "
+ + "222222789123456789.12345678, "
+ + "hello , "
+ + "Hi Yu bin, "
+ + "这是一个 built in parquet format, "
+ + "2022-05-23");
+ }
+
+ private void runSql(String sql, String... ddls) throws Exception {
+ runSql(
+ "SET 'execution.runtime-mode' = 'batch';\n"
+ + "SET 'table.dml-sync' = 'true';\n"
+ + String.join("\n", ddls)
+ + "\n"
+ + sql);
+ }
+}
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index 51d6a07f..6b47de37 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -153,11 +153,25 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${flink.parquet}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.sql.parquet}</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -207,6 +221,7 @@ under the License.
<includes combine.children="append">
<include>org.apache.flink:flink-sql-avro</include>
<include>org.apache.flink:${flink.sql.orc}</include>
+ <include>org.apache.flink:${flink.sql.parquet}</include>
</includes>
</artifactSet>
<filters>
@@ -227,6 +242,12 @@ under the License.
<exclude>META-INF/services/**</exclude>
</excludes>
</filter>
+ <filter>
+ <artifact>org.apache.flink:${flink.sql.parquet}</artifact>
+ <excludes>
+ <exclude>META-INF/services/**</exclude>
+ </excludes>
+ </filter>
<!-- Another copy of the Apache license, which we don't need. -->
<filter>
<artifact>*</artifact>
@@ -244,6 +265,14 @@ under the License.
<pattern>org.apache.flink.formats.avro</pattern>
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.avro</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.apache.parquet</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.parquet</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.flink.formats.parquet</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.parquet</shadedPattern>
+ </relocation>
<relocation>
<pattern>org.apache.flink.orc</pattern>
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.orc</shadedPattern>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormat.java
new file mode 100644
index 00000000..28b9b110
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.utils.Projection;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.UTC_TIMEZONE;
+import static org.apache.flink.table.store.format.parquet.ParquetFileFormatFactory.IDENTIFIER;
+
+/** Parquet {@link FileFormat}. */
+public class ParquetFileFormat extends FileFormat {
+
+ private final Configuration formatOptions;
+
+ public ParquetFileFormat(Configuration formatOptions) {
+ super(IDENTIFIER);
+ this.formatOptions = formatOptions;
+ }
+
+ @VisibleForTesting
+ Configuration formatOptions() {
+ return formatOptions;
+ }
+
+ @Override
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, int[][] projection, List<Predicate> filters) {
+ return ParquetInputFormatFactory.create(
+ getParquetConfiguration(formatOptions),
+ (RowType) Projection.of(projection).project(type),
+ InternalTypeInfo.of(type),
+ formatOptions.get(UTC_TIMEZONE));
+ }
+
+ @Override
+ public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+ return ParquetRowDataBuilder.createWriterFactory(
+ type, getParquetConfiguration(formatOptions), formatOptions.get(UTC_TIMEZONE));
+ }
+
+ @Override
+ public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+ return Optional.of(new ParquetFileStatsExtractor(type));
+ }
+
+ public static org.apache.hadoop.conf.Configuration getParquetConfiguration(
+ Configuration options) {
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ options.toMap().forEach((key, value) -> conf.set(IDENTIFIER + "." + key, value));
+ return conf;
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatFactory.java
new file mode 100644
index 00000000..7b7c60f5
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.format.FileFormatFactory;
+
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.util.Properties;
+
+/** Factory to create {@link ParquetFileFormat}. */
+public class ParquetFileFormatFactory implements FileFormatFactory {
+ public static final String IDENTIFIER = "parquet";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public ParquetFileFormat create(Configuration formatOptions) {
+ return new ParquetFileFormat(supplyDefaultOptions(formatOptions));
+ }
+
+ private Configuration supplyDefaultOptions(Configuration options) {
+ String compression =
+ ParquetOutputFormat.COMPRESSION.replaceFirst(String.format("^%s.", IDENTIFIER), "");
+ if (!options.containsKey(compression)) {
+ Properties properties = new Properties();
+ options.addAllToProperties(properties);
+ properties.setProperty(compression, CompressionCodecName.SNAPPY.name());
+ Configuration newOptions = new Configuration();
+ properties.forEach((k, v) -> newOptions.setString(k.toString(), v.toString()));
+ return newOptions;
+ }
+ return options;
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
new file mode 100644
index 00000000..09f836d8
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.utils.DateTimeUtils;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.DoubleStatistics;
+import org.apache.parquet.column.statistics.FloatStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.format.parquet.ParquetUtil.assertStatsClass;
+
+/** {@link FileStatsExtractor} for parquet files. */
+public class ParquetFileStatsExtractor implements FileStatsExtractor {
+
+ private final RowType rowType;
+ private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ public ParquetFileStatsExtractor(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public FieldStats[] extract(Path path) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+ Map<String, Statistics> stats = ParquetUtil.extractColumnStats(hadoopPath);
+
+ return IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(
+ i -> {
+ RowType.RowField field = rowType.getFields().get(i);
+ return toFieldStats(field, stats.get(field.getName()));
+ })
+ .toArray(FieldStats[]::new);
+ }
+
+ private FieldStats toFieldStats(RowType.RowField field, Statistics stats) {
+ LogicalTypeRoot flinkType = field.getType().getTypeRoot();
+ if (stats == null
+ || flinkType == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+ || flinkType == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
+ throw new UnsupportedOperationException(
+ "type "
+ + field.getType().getTypeRoot()
+ + " not supported for extracting statistics in parquet format");
+ }
+ long nullCount = stats.getNumNulls();
+ if (!stats.hasNonNullValue()) {
+ return new FieldStats(null, null, nullCount);
+ }
+
+ switch (flinkType) {
+ case CHAR:
+ case VARCHAR:
+ assertStatsClass(field, stats, BinaryStatistics.class);
+ BinaryStatistics binaryStats = (BinaryStatistics) stats;
+ return new FieldStats(
+ StringData.fromString(binaryStats.minAsString()),
+ StringData.fromString(binaryStats.maxAsString()),
+ nullCount);
+ case BOOLEAN:
+ assertStatsClass(field, stats, BooleanStatistics.class);
+ BooleanStatistics boolStats = (BooleanStatistics) stats;
+ return new FieldStats(boolStats.getMin(), boolStats.getMax(), nullCount);
+ case DECIMAL:
+ PrimitiveType primitive = stats.type();
+ DecimalType decimalType = (DecimalType) (field.getType());
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ if (primitive.getOriginalType() != null
+ && primitive.getLogicalTypeAnnotation()
+ instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ return convertStatsToDecimalFieldStats(
+ primitive, field, stats, precision, scale, nullCount);
+ } else {
+ return new FieldStats(null, null, nullCount);
+ }
+ case TINYINT:
+ assertStatsClass(field, stats, IntStatistics.class);
+ IntStatistics byteStats = (IntStatistics) stats;
+ return new FieldStats(
+ (byte) byteStats.getMin(), (byte) byteStats.getMax(), nullCount);
+ case SMALLINT:
+ assertStatsClass(field, stats, IntStatistics.class);
+ IntStatistics shortStats = (IntStatistics) stats;
+ return new FieldStats(
+ (short) shortStats.getMin(), (short) shortStats.getMax(), nullCount);
+ case INTEGER:
+ assertStatsClass(field, stats, IntStatistics.class);
+ IntStatistics intStats = (IntStatistics) stats;
+ return new FieldStats(
+ Long.valueOf(intStats.getMin()).intValue(),
+ Long.valueOf(intStats.getMax()).intValue(),
+ nullCount);
+ case BIGINT:
+ assertStatsClass(field, stats, LongStatistics.class);
+ LongStatistics longStats = (LongStatistics) stats;
+ return new FieldStats(longStats.getMin(), longStats.getMax(), nullCount);
+ case FLOAT:
+ assertStatsClass(field, stats, FloatStatistics.class);
+ FloatStatistics floatStats = (FloatStatistics) stats;
+ return new FieldStats(floatStats.getMin(), floatStats.getMax(), nullCount);
+ case DOUBLE:
+ assertStatsClass(field, stats, DoubleStatistics.class);
+ DoubleStatistics doubleStats = (DoubleStatistics) stats;
+ return new FieldStats(doubleStats.getMin(), doubleStats.getMax(), nullCount);
+ case DATE:
+ assertStatsClass(field, stats, IntStatistics.class);
+ IntStatistics dateStats = (IntStatistics) stats;
+ return new FieldStats(
+ DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMin())),
+ DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMax())),
+ nullCount);
+ default:
+ return new FieldStats(null, null, nullCount);
+ }
+ }
+
+ /**
+ * Parquet cannot provide statistics for decimal fields directly, but we can extract them from
+ * primitive statistics.
+ */
+ private FieldStats convertStatsToDecimalFieldStats(
+ PrimitiveType primitive,
+ RowType.RowField field,
+ Statistics stats,
+ int precision,
+ int scale,
+ long nullCount) {
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ assertStatsClass(field, stats, BinaryStatistics.class);
+ BinaryStatistics decimalStats = (BinaryStatistics) stats;
+ return new FieldStats(
+ DecimalData.fromBigDecimal(
+ new BigDecimal(new BigInteger(decimalStats.getMinBytes()), scale),
+ precision,
+ scale),
+ DecimalData.fromBigDecimal(
+ new BigDecimal(new BigInteger(decimalStats.getMaxBytes()), scale),
+ precision,
+ scale),
+ nullCount);
+ case INT64:
+ assertStatsClass(field, stats, LongStatistics.class);
+ LongStatistics longStats = (LongStatistics) stats;
+ return new FieldStats(
+ DecimalData.fromUnscaledLong(longStats.getMin(), precision, scale),
+ DecimalData.fromUnscaledLong(longStats.getMax(), precision, scale),
+ nullCount);
+ case INT32:
+ assertStatsClass(field, stats, IntStatistics.class);
+ IntStatistics intStats = (IntStatistics) stats;
+ return new FieldStats(
+ DecimalData.fromUnscaledLong(intStats.getMin(), precision, scale),
+ DecimalData.fromUnscaledLong(intStats.getMax(), precision, scale),
+ nullCount);
+ default:
+ return new FieldStats(null, null, nullCount);
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetInputFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetInputFormatFactory.java
new file mode 100644
index 00000000..f1fe2980
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetInputFormatFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.utils.ReflectionUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+
+/** Factory to create parquet input format for different Flink versions. */
+public class ParquetInputFormatFactory {
+
+ public static BulkFormat<RowData, FileSourceSplit> create(
+ Configuration conf,
+ RowType producedRowType,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean isUtcTimestamp) {
+ Class<?> formatClass = null;
+ try {
+ formatClass =
+ Class.forName("org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat");
+ return createFromLatestFlinkVersion(
+ formatClass, conf, producedRowType, producedTypeInfo, isUtcTimestamp);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ try {
+ return createFrom114(formatClass, conf, producedRowType, isUtcTimestamp);
+ } catch (NoSuchMethodException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private static BulkFormat<RowData, FileSourceSplit> createFromLatestFlinkVersion(
+ Class<?> formatClass,
+ Configuration conf,
+ RowType producedRowType,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean isUtcTimestamp)
+ throws NoSuchMethodException {
+ try {
+ return ReflectionUtils.invokeStaticMethod(
+ formatClass,
+ "createPartitionedFormat",
+ conf,
+ producedRowType,
+ producedTypeInfo,
+ Collections.emptyList(),
+ null,
+ 2048,
+ isUtcTimestamp,
+ true);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static BulkFormat<RowData, FileSourceSplit> createFrom114(
+ Class<?> formatClass,
+ Configuration conf,
+ RowType producedRowType,
+ boolean isUtcTimestamp)
+ throws NoSuchMethodException {
+ try {
+ return ReflectionUtils.invokeStaticMethod(
+ formatClass,
+ "createPartitionedFormat",
+ conf,
+ producedRowType,
+ Collections.emptyList(),
+ null,
+ 2048,
+ isUtcTimestamp,
+ true);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
new file mode 100644
index 00000000..009af9d5
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Parquet utilities that support to extract the metadata, assert expected stats, etc. */
+public class ParquetUtil {
+
+ /**
+ * Extract stats from specified Parquet files path.
+ *
+ * @param path the path of parquet files to be read
+ * @return result sets as map, key is column name, value is statistics (for example, null count,
+ * minimum value, maximum value)
+ * @throws IOException
+ */
+ public static Map<String, Statistics> extractColumnStats(Path path) throws IOException {
+ ParquetMetadata parquetMetadata = getParquetReader(path).getFooter();
+ List<BlockMetaData> blockMetaDataList = parquetMetadata.getBlocks();
+ Map<String, Statistics> resultStats = new HashMap<>();
+ for (BlockMetaData blockMetaData : blockMetaDataList) {
+ List<ColumnChunkMetaData> columnChunkMetaDataList = blockMetaData.getColumns();
+ for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) {
+ Statistics stats = columnChunkMetaData.getStatistics();
+ String columnName = columnChunkMetaData.getPath().toDotString();
+ Statistics midStats;
+ if (!resultStats.containsKey(columnName)) {
+ midStats = stats;
+ } else {
+ midStats = resultStats.get(columnName);
+ midStats.mergeStatistics(stats);
+ }
+ resultStats.put(columnName, midStats);
+ }
+ }
+ return resultStats;
+ }
+
+ /**
+ * Generate {@link ParquetFileReader} instance to read the Parquet files at the given path.
+ *
+ * @param path the path of parquet files to be read
+ * @return parquet reader, used for reading footer, status, etc.
+ * @throws IOException
+ */
+ public static ParquetFileReader getParquetReader(Path path) throws IOException {
+ HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, new Configuration());
+ return ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build());
+ }
+
+ static void assertStatsClass(
+ RowType.RowField field, Statistics stats, Class<? extends Statistics> expectedClass) {
+ if (!expectedClass.isInstance(stats)) {
+ throw new IllegalArgumentException(
+ "Expecting "
+ + expectedClass.getName()
+ + " for field "
+ + field.asSummaryString()
+ + " but found "
+ + stats.getClass().getName());
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
index e0d54577..b0551bd8 100644
--- a/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
+++ b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.flink.table.store.format.avro.AvroFileFormatFactory
-org.apache.flink.table.store.format.orc.OrcFileFormatFactory
\ No newline at end of file
+org.apache.flink.table.store.format.orc.OrcFileFormatFactory
+org.apache.flink.table.store.format.parquet.ParquetFileFormatFactory
\ No newline at end of file
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatTest.java
new file mode 100644
index 00000000..f930c992
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+
+import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.store.format.parquet.ParquetFileFormat.getParquetConfiguration;
+import static org.apache.flink.table.store.format.parquet.ParquetFileFormatFactory.IDENTIFIER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ParquetFileFormatFactory}. */
+public class ParquetFileFormatTest {
+ private static final ConfigOption<String> KEY1 =
+ ConfigOptions.key("k1").stringType().defaultValue("absent");
+
+ @Test
+ public void testAbsent() {
+ Configuration options = new Configuration();
+ ParquetFileFormat parquet = new ParquetFileFormatFactory().create(options);
+ assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("absent");
+ }
+
+ @Test
+ public void testPresent() {
+ Configuration options = new Configuration();
+ options.setString(KEY1.key(), "v1");
+ ParquetFileFormat parquet = new ParquetFileFormatFactory().create(options);
+ assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("v1");
+ }
+
+ @Test
+ public void testDefaultCompressionCodecName() {
+ Configuration conf = new Configuration();
+ assertThat(getCompressionCodec(conf)).isEqualTo(CompressionCodec.SNAPPY.name());
+ }
+
+ @Test
+ public void testSpecifiedCompressionCodecName() {
+ String lz4 = CompressionCodec.LZ4.name();
+ Configuration conf = new Configuration();
+ conf.setString(ParquetOutputFormat.COMPRESSION, lz4);
+ assertThat(getCompressionCodec(conf)).isEqualTo(lz4);
+ }
+
+ private String getCompressionCodec(Configuration conf) {
+ DelegatingConfiguration formatOptions = new DelegatingConfiguration(conf, IDENTIFIER + ".");
+ ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatOptions);
+ return getParquetConfiguration(parquet.formatOptions())
+ .get(ParquetOutputFormat.COMPRESSION);
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
new file mode 100644
index 00000000..f48c4a64
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** Tests for {@link ParquetFileStatsExtractor}. */
+public class ParquetFileStatsExtractorTest extends FileStatsExtractorTestBase {
+
+ @Override
+ protected FileFormat createFormat() {
+ return FileFormat.fromIdentifier("parquet", new Configuration());
+ }
+
+ @Override
+ protected RowType rowType() {
+ return RowType.of(
+ new CharType(8),
+ new VarCharType(8),
+ new BooleanType(),
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new FloatType(),
+ new DoubleType(),
+ new DecimalType(5, 2),
+ new DecimalType(38, 18),
+ new DateType());
+ }
+}
diff --git a/pom.xml b/pom.xml
index db98356f..05b83b91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@ under the License.
<flink.streaming.java>flink-streaming-java</flink.streaming.java>
<flink.sql.orc>flink-sql-orc</flink.sql.orc>
<flink.orc>flink-orc</flink.orc>
+ <flink.parquet>flink-parquet</flink.parquet>
<flink.connector.kafka>flink-connector-kafka</flink.connector.kafka>
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.test.utils>flink-test-utils</flink.test.utils>
@@ -296,6 +297,7 @@ under the License.
<flink.streaming.java>flink-streaming-java_${scala.binary.version}</flink.streaming.java>
<flink.sql.orc>flink-sql-orc_${scala.binary.version}</flink.sql.orc>
<flink.orc>flink-orc_${scala.binary.version}</flink.orc>
+ <flink.parquet>flink-parquet_${scala.binary.version}</flink.parquet>
<flink.connector.kafka>flink-connector-kafka_${scala.binary.version}</flink.connector.kafka>
<flink.sql.connector.kafka>flink-sql-connector-kafka_${scala.binary.version}</flink.sql.connector.kafka>
<flink.test.utils>flink-test-utils_${scala.binary.version}</flink.test.utils>