You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/05 13:10:00 UTC

[flink-table-store] branch master updated: [FLINK-27207] Support built-in parquet format

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d93d29 [FLINK-27207] Support built-in parquet format
70d93d29 is described below

commit 70d93d29876837d99dd80f422bb39e54fb8b8094
Author: Yubin Li <li...@163.com>
AuthorDate: Mon Sep 5 21:09:54 2022 +0800

    [FLINK-27207] Support built-in parquet format
    
    This closes #134
---
 .../flink/table/store/utils/DateTimeUtils.java     |   5 +
 .../store/tests/FileStoreBuiltInFormatE2eTest.java | 120 ++++++++++++
 flink-table-store-format/pom.xml                   |  31 +++-
 .../store/format/parquet/ParquetFileFormat.java    |  83 +++++++++
 .../format/parquet/ParquetFileFormatFactory.java   |  56 ++++++
 .../format/parquet/ParquetFileStatsExtractor.java  | 204 +++++++++++++++++++++
 .../format/parquet/ParquetInputFormatFactory.java  | 103 +++++++++++
 .../table/store/format/parquet/ParquetUtil.java    |  95 ++++++++++
 ...ache.flink.table.store.format.FileFormatFactory |   3 +-
 .../format/parquet/ParquetFileFormatTest.java      |  74 ++++++++
 .../parquet/ParquetFileStatsExtractorTest.java     |  61 ++++++
 pom.xml                                            |   2 +
 12 files changed, 835 insertions(+), 2 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java
index 10905468..f1814613 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/DateTimeUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.utils;
 import org.apache.flink.table.data.TimestampData;
 
 import java.time.DateTimeException;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
@@ -92,6 +93,10 @@ public class DateTimeUtils {
         return (int) (ts / MILLIS_PER_DAY);
     }
 
+    public static int toInternal(LocalDate date) {
+        return ymdToUnixDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth());
+    }
+
     public static Integer parseDate(String s) {
         // allow timestamp str to date, e.g. 2017-12-12 09:30:00.0
         int ws1 = s.indexOf(" ");
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBuiltInFormatE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBuiltInFormatE2eTest.java
new file mode 100644
index 00000000..cfeab630
--- /dev/null
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBuiltInFormatE2eTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.tests;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+
+/** Test that file store supports format included in flink-table-store-format. */
+public class FileStoreBuiltInFormatE2eTest extends E2eTestBase {
+
+    @Test
+    public void testParquet() throws Exception {
+        String schema =
+                "id INT,\n"
+                        + "isMan BOOLEAN,\n"
+                        + "houseNum TINYINT,\n"
+                        + "debugNum SMALLINT,\n"
+                        + "age INT,\n"
+                        + "cash BIGINT,\n"
+                        + "money FLOAT,\n"
+                        + "f1 DOUBLE,\n"
+                        + "f2 DECIMAL(5, 3),\n"
+                        + "f3 DECIMAL(26, 8),\n"
+                        + "f4 CHAR(10),\n"
+                        + "f5 VARCHAR(10),\n"
+                        + "f6 STRING,\n"
+                        + "f7 DATE\n";
+        String tableStoreDdl =
+                "CREATE TABLE IF NOT EXISTS table_store (\n"
+                        + schema
+                        + ") WITH (\n"
+                        + "    'bucket' = '3',\n"
+                        + "    'root-path' = '%s',\n"
+                        + "    'file.format' = 'parquet'\n"
+                        + ");";
+        tableStoreDdl =
+                String.format(
+                        tableStoreDdl,
+                        TEST_DATA_DIR + "/" + UUID.randomUUID().toString() + ".store");
+        String insertDml =
+                "INSERT INTO table_store VALUES ("
+                        + "1,"
+                        + "true,"
+                        + "cast(1 as tinyint),"
+                        + "cast(10 as smallint),"
+                        + "cast(100 as int),"
+                        + "cast(999999 as bigint),"
+                        + "cast(1.1 as float),"
+                        + "1.11,"
+                        + "12.456,"
+                        + "cast('123456789123456789.12345678' as decimal(26, 8)),"
+                        + "cast('hi' as char(10)),"
+                        + "'Parquet',"
+                        + "'这是一个parquet format',"
+                        + "DATE '2022-05-21'"
+                        + "),("
+                        + "2,"
+                        + "false,"
+                        + "cast(2 as tinyint),"
+                        + "cast(29 as smallint),"
+                        + "cast(200 as int),"
+                        + "cast(9999999 as bigint),"
+                        + "cast(2.2 as float),"
+                        + "2.22,"
+                        + "22.557,"
+                        + "cast('222222789123456789.12345678' as decimal(26, 8)),"
+                        + "cast('hello' as char(10)),"
+                        + "'Hi Yu bin',"
+                        + "'这是一个 built in parquet format',"
+                        + "DATE '2022-05-23'"
+                        + ")";
+        String resultDdl = createResultSink("result1", schema);
+        runSql(insertDml, tableStoreDdl);
+        runSql(
+                "INSERT INTO result1 SELECT * FROM table_store where id > 1;",
+                tableStoreDdl,
+                resultDdl);
+        checkResult(
+                "2, "
+                        + "false, "
+                        + "2, "
+                        + "29, "
+                        + "200, "
+                        + "9999999, "
+                        + "2.2, "
+                        + "2.22, "
+                        + "22.557, "
+                        + "222222789123456789.12345678, "
+                        + "hello     , "
+                        + "Hi Yu bin, "
+                        + "这是一个 built in parquet format, "
+                        + "2022-05-23");
+    }
+
+    private void runSql(String sql, String... ddls) throws Exception {
+        runSql(
+                "SET 'execution.runtime-mode' = 'batch';\n"
+                        + "SET 'table.dml-sync' = 'true';\n"
+                        + String.join("\n", ddls)
+                        + "\n"
+                        + sql);
+    }
+}
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index 51d6a07f..6b47de37 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -153,11 +153,25 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.parquet}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>${flink.sql.parquet}</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>runtime</scope>
+            <optional>true</optional>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -207,6 +221,7 @@ under the License.
                                 <includes combine.children="append">
                                     <include>org.apache.flink:flink-sql-avro</include>
                                     <include>org.apache.flink:${flink.sql.orc}</include>
+                                    <include>org.apache.flink:${flink.sql.parquet}</include>
                                 </includes>
                             </artifactSet>
                             <filters>
@@ -227,6 +242,12 @@ under the License.
                                         <exclude>META-INF/services/**</exclude>
                                     </excludes>
                                 </filter>
+                                <filter>
+                                    <artifact>org.apache.flink:${flink.sql.parquet}</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/services/**</exclude>
+                                    </excludes>
+                                </filter>
                                 <!-- Another copy of the Apache license, which we don't need. -->
                                 <filter>
                                     <artifact>*</artifact>
@@ -244,6 +265,14 @@ under the License.
                                     <pattern>org.apache.flink.formats.avro</pattern>
                                     <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.avro</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.apache.parquet</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.parquet</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.flink.formats.parquet</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.parquet</shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.flink.orc</pattern>
                                     <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.orc</shadedPattern>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormat.java
new file mode 100644
index 00000000..28b9b110
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.utils.Projection;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.UTC_TIMEZONE;
+import static org.apache.flink.table.store.format.parquet.ParquetFileFormatFactory.IDENTIFIER;
+
+/** Parquet {@link FileFormat}. */
+public class ParquetFileFormat extends FileFormat {
+
+    private final Configuration formatOptions;
+
+    public ParquetFileFormat(Configuration formatOptions) {
+        super(IDENTIFIER);
+        this.formatOptions = formatOptions;
+    }
+
+    @VisibleForTesting
+    Configuration formatOptions() {
+        return formatOptions;
+    }
+
+    @Override
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<Predicate> filters) {
+        return ParquetInputFormatFactory.create(
+                getParquetConfiguration(formatOptions),
+                (RowType) Projection.of(projection).project(type),
+                InternalTypeInfo.of(type),
+                formatOptions.get(UTC_TIMEZONE));
+    }
+
+    @Override
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+        return ParquetRowDataBuilder.createWriterFactory(
+                type, getParquetConfiguration(formatOptions), formatOptions.get(UTC_TIMEZONE));
+    }
+
+    @Override
+    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+        return Optional.of(new ParquetFileStatsExtractor(type));
+    }
+
+    public static org.apache.hadoop.conf.Configuration getParquetConfiguration(
+            Configuration options) {
+        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+        options.toMap().forEach((key, value) -> conf.set(IDENTIFIER + "." + key, value));
+        return conf;
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatFactory.java
new file mode 100644
index 00000000..7b7c60f5
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.format.FileFormatFactory;
+
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.util.Properties;
+
+/** Factory to create {@link ParquetFileFormat}. */
+public class ParquetFileFormatFactory implements FileFormatFactory {
+    public static final String IDENTIFIER = "parquet";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public ParquetFileFormat create(Configuration formatOptions) {
+        return new ParquetFileFormat(supplyDefaultOptions(formatOptions));
+    }
+
+    private Configuration supplyDefaultOptions(Configuration options) {
+        String compression =
+                ParquetOutputFormat.COMPRESSION.replaceFirst(String.format("^%s.", IDENTIFIER), "");
+        if (!options.containsKey(compression)) {
+            Properties properties = new Properties();
+            options.addAllToProperties(properties);
+            properties.setProperty(compression, CompressionCodecName.SNAPPY.name());
+            Configuration newOptions = new Configuration();
+            properties.forEach((k, v) -> newOptions.setString(k.toString(), v.toString()));
+            return newOptions;
+        }
+        return options;
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
new file mode 100644
index 00000000..09f836d8
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.utils.DateTimeUtils;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.DoubleStatistics;
+import org.apache.parquet.column.statistics.FloatStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.format.parquet.ParquetUtil.assertStatsClass;
+
+/** {@link FileStatsExtractor} for parquet files. */
+public class ParquetFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+    private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+    private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+    public ParquetFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+        Map<String, Statistics> stats = ParquetUtil.extractColumnStats(hadoopPath);
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = rowType.getFields().get(i);
+                            return toFieldStats(field, stats.get(field.getName()));
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, Statistics stats) {
+        LogicalTypeRoot flinkType = field.getType().getTypeRoot();
+        if (stats == null
+                || flinkType == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+                || flinkType == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
+            throw new UnsupportedOperationException(
+                    "type "
+                            + field.getType().getTypeRoot()
+                            + " not supported for extracting statistics in parquet format");
+        }
+        long nullCount = stats.getNumNulls();
+        if (!stats.hasNonNullValue()) {
+            return new FieldStats(null, null, nullCount);
+        }
+
+        switch (flinkType) {
+            case CHAR:
+            case VARCHAR:
+                assertStatsClass(field, stats, BinaryStatistics.class);
+                BinaryStatistics binaryStats = (BinaryStatistics) stats;
+                return new FieldStats(
+                        StringData.fromString(binaryStats.minAsString()),
+                        StringData.fromString(binaryStats.maxAsString()),
+                        nullCount);
+            case BOOLEAN:
+                assertStatsClass(field, stats, BooleanStatistics.class);
+                BooleanStatistics boolStats = (BooleanStatistics) stats;
+                return new FieldStats(boolStats.getMin(), boolStats.getMax(), nullCount);
+            case DECIMAL:
+                PrimitiveType primitive = stats.type();
+                DecimalType decimalType = (DecimalType) (field.getType());
+                int precision = decimalType.getPrecision();
+                int scale = decimalType.getScale();
+                if (primitive.getOriginalType() != null
+                        && primitive.getLogicalTypeAnnotation()
+                                instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+                    return convertStatsToDecimalFieldStats(
+                            primitive, field, stats, precision, scale, nullCount);
+                } else {
+                    return new FieldStats(null, null, nullCount);
+                }
+            case TINYINT:
+                assertStatsClass(field, stats, IntStatistics.class);
+                IntStatistics byteStats = (IntStatistics) stats;
+                return new FieldStats(
+                        (byte) byteStats.getMin(), (byte) byteStats.getMax(), nullCount);
+            case SMALLINT:
+                assertStatsClass(field, stats, IntStatistics.class);
+                IntStatistics shortStats = (IntStatistics) stats;
+                return new FieldStats(
+                        (short) shortStats.getMin(), (short) shortStats.getMax(), nullCount);
+            case INTEGER:
+                assertStatsClass(field, stats, IntStatistics.class);
+                IntStatistics intStats = (IntStatistics) stats;
+                return new FieldStats(
+                        Long.valueOf(intStats.getMin()).intValue(),
+                        Long.valueOf(intStats.getMax()).intValue(),
+                        nullCount);
+            case BIGINT:
+                assertStatsClass(field, stats, LongStatistics.class);
+                LongStatistics longStats = (LongStatistics) stats;
+                return new FieldStats(longStats.getMin(), longStats.getMax(), nullCount);
+            case FLOAT:
+                assertStatsClass(field, stats, FloatStatistics.class);
+                FloatStatistics floatStats = (FloatStatistics) stats;
+                return new FieldStats(floatStats.getMin(), floatStats.getMax(), nullCount);
+            case DOUBLE:
+                assertStatsClass(field, stats, DoubleStatistics.class);
+                DoubleStatistics doubleStats = (DoubleStatistics) stats;
+                return new FieldStats(doubleStats.getMin(), doubleStats.getMax(), nullCount);
+            case DATE:
+                assertStatsClass(field, stats, IntStatistics.class);
+                IntStatistics dateStats = (IntStatistics) stats;
+                return new FieldStats(
+                        DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMin())),
+                        DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMax())),
+                        nullCount);
+            default:
+                return new FieldStats(null, null, nullCount);
+        }
+    }
+
+    /**
+     * Parquet cannot provide statistics for decimal fields directly, but we can extract them from
+     * primitive statistics.
+     */
+    private FieldStats convertStatsToDecimalFieldStats(
+            PrimitiveType primitive,
+            RowType.RowField field,
+            Statistics stats,
+            int precision,
+            int scale,
+            long nullCount) {
+        switch (primitive.getPrimitiveTypeName()) {
+            case BINARY:
+            case FIXED_LEN_BYTE_ARRAY:
+                assertStatsClass(field, stats, BinaryStatistics.class);
+                BinaryStatistics decimalStats = (BinaryStatistics) stats;
+                return new FieldStats(
+                        DecimalData.fromBigDecimal(
+                                new BigDecimal(new BigInteger(decimalStats.getMinBytes()), scale),
+                                precision,
+                                scale),
+                        DecimalData.fromBigDecimal(
+                                new BigDecimal(new BigInteger(decimalStats.getMaxBytes()), scale),
+                                precision,
+                                scale),
+                        nullCount);
+            case INT64:
+                assertStatsClass(field, stats, LongStatistics.class);
+                LongStatistics longStats = (LongStatistics) stats;
+                return new FieldStats(
+                        DecimalData.fromUnscaledLong(longStats.getMin(), precision, scale),
+                        DecimalData.fromUnscaledLong(longStats.getMax(), precision, scale),
+                        nullCount);
+            case INT32:
+                assertStatsClass(field, stats, IntStatistics.class);
+                IntStatistics intStats = (IntStatistics) stats;
+                return new FieldStats(
+                        DecimalData.fromUnscaledLong(intStats.getMin(), precision, scale),
+                        DecimalData.fromUnscaledLong(intStats.getMax(), precision, scale),
+                        nullCount);
+            default:
+                return new FieldStats(null, null, nullCount);
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetInputFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetInputFormatFactory.java
new file mode 100644
index 00000000..f1fe2980
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetInputFormatFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.utils.ReflectionUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+
+/** Factory to create parquet input format for different Flink versions. */
+public class ParquetInputFormatFactory {
+
+    public static BulkFormat<RowData, FileSourceSplit> create(
+            Configuration conf,
+            RowType producedRowType,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean isUtcTimestamp) {
+        Class<?> formatClass = null;
+        try {
+            formatClass =
+                    Class.forName("org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat");
+            return createFromLatestFlinkVersion(
+                    formatClass, conf, producedRowType, producedTypeInfo, isUtcTimestamp);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        } catch (NoSuchMethodException e) {
+            try {
+                return createFrom114(formatClass, conf, producedRowType, isUtcTimestamp);
+            } catch (NoSuchMethodException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+    }
+
+    private static BulkFormat<RowData, FileSourceSplit> createFromLatestFlinkVersion(
+            Class<?> formatClass,
+            Configuration conf,
+            RowType producedRowType,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean isUtcTimestamp)
+            throws NoSuchMethodException {
+        try {
+            return ReflectionUtils.invokeStaticMethod(
+                    formatClass,
+                    "createPartitionedFormat",
+                    conf,
+                    producedRowType,
+                    producedTypeInfo,
+                    Collections.emptyList(),
+                    null,
+                    2048,
+                    isUtcTimestamp,
+                    true);
+        } catch (InvocationTargetException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static BulkFormat<RowData, FileSourceSplit> createFrom114(
+            Class<?> formatClass,
+            Configuration conf,
+            RowType producedRowType,
+            boolean isUtcTimestamp)
+            throws NoSuchMethodException {
+        try {
+            return ReflectionUtils.invokeStaticMethod(
+                    formatClass,
+                    "createPartitionedFormat",
+                    conf,
+                    producedRowType,
+                    Collections.emptyList(),
+                    null,
+                    2048,
+                    isUtcTimestamp,
+                    true);
+        } catch (InvocationTargetException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
new file mode 100644
index 00000000..009af9d5
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Parquet utilities that support to extract the metadata, assert expected stats, etc. */
+public class ParquetUtil {
+
+    /**
+     * Extract stats from specified Parquet files path.
+     *
+     * @param path the path of parquet files to be read
+     * @return result sets as map, key is column name, value is statistics (for example, null count,
+     *     minimum value, maximum value)
+     * @throws IOException
+     */
+    public static Map<String, Statistics> extractColumnStats(Path path) throws IOException {
+        ParquetMetadata parquetMetadata = getParquetReader(path).getFooter();
+        List<BlockMetaData> blockMetaDataList = parquetMetadata.getBlocks();
+        Map<String, Statistics> resultStats = new HashMap<>();
+        for (BlockMetaData blockMetaData : blockMetaDataList) {
+            List<ColumnChunkMetaData> columnChunkMetaDataList = blockMetaData.getColumns();
+            for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) {
+                Statistics stats = columnChunkMetaData.getStatistics();
+                String columnName = columnChunkMetaData.getPath().toDotString();
+                Statistics midStats;
+                if (!resultStats.containsKey(columnName)) {
+                    midStats = stats;
+                } else {
+                    midStats = resultStats.get(columnName);
+                    midStats.mergeStatistics(stats);
+                }
+                resultStats.put(columnName, midStats);
+            }
+        }
+        return resultStats;
+    }
+
+    /**
+     * Generate {@link ParquetFileReader} instance to read the Parquet files at the given path.
+     *
+     * @param path the path of parquet files to be read
+     * @return parquet reader, used for reading footer, status, etc.
+     * @throws IOException
+     */
+    public static ParquetFileReader getParquetReader(Path path) throws IOException {
+        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, new Configuration());
+        return ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build());
+    }
+
+    static void assertStatsClass(
+            RowType.RowField field, Statistics stats, Class<? extends Statistics> expectedClass) {
+        if (!expectedClass.isInstance(stats)) {
+            throw new IllegalArgumentException(
+                    "Expecting "
+                            + expectedClass.getName()
+                            + " for field "
+                            + field.asSummaryString()
+                            + " but found "
+                            + stats.getClass().getName());
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
index e0d54577..b0551bd8 100644
--- a/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
+++ b/flink-table-store-format/src/main/resources/META-INF/services/org.apache.flink.table.store.format.FileFormatFactory
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.flink.table.store.format.avro.AvroFileFormatFactory
-org.apache.flink.table.store.format.orc.OrcFileFormatFactory
\ No newline at end of file
+org.apache.flink.table.store.format.orc.OrcFileFormatFactory
+org.apache.flink.table.store.format.parquet.ParquetFileFormatFactory
\ No newline at end of file
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatTest.java
new file mode 100644
index 00000000..f930c992
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileFormatTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+
+import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.store.format.parquet.ParquetFileFormat.getParquetConfiguration;
+import static org.apache.flink.table.store.format.parquet.ParquetFileFormatFactory.IDENTIFIER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ParquetFileFormatFactory}. */
+public class ParquetFileFormatTest {
+    private static final ConfigOption<String> KEY1 =
+            ConfigOptions.key("k1").stringType().defaultValue("absent");
+
+    @Test
+    public void testAbsent() {
+        Configuration options = new Configuration();
+        ParquetFileFormat parquet = new ParquetFileFormatFactory().create(options);
+        assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("absent");
+    }
+
+    @Test
+    public void testPresent() {
+        Configuration options = new Configuration();
+        options.setString(KEY1.key(), "v1");
+        ParquetFileFormat parquet = new ParquetFileFormatFactory().create(options);
+        assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("v1");
+    }
+
+    @Test
+    public void testDefaultCompressionCodecName() {
+        Configuration conf = new Configuration();
+        assertThat(getCompressionCodec(conf)).isEqualTo(CompressionCodec.SNAPPY.name());
+    }
+
+    @Test
+    public void testSpecifiedCompressionCodecName() {
+        String lz4 = CompressionCodec.LZ4.name();
+        Configuration conf = new Configuration();
+        conf.setString(ParquetOutputFormat.COMPRESSION, lz4);
+        assertThat(getCompressionCodec(conf)).isEqualTo(lz4);
+    }
+
+    private String getCompressionCodec(Configuration conf) {
+        DelegatingConfiguration formatOptions = new DelegatingConfiguration(conf, IDENTIFIER + ".");
+        ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatOptions);
+        return getParquetConfiguration(parquet.formatOptions())
+                .get(ParquetOutputFormat.COMPRESSION);
+    }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
new file mode 100644
index 00000000..f48c4a64
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** Tests for {@link ParquetFileStatsExtractor}. */
+public class ParquetFileStatsExtractorTest extends FileStatsExtractorTestBase {
+
+    @Override
+    protected FileFormat createFormat() {
+        return FileFormat.fromIdentifier("parquet", new Configuration());
+    }
+
+    @Override
+    protected RowType rowType() {
+        return RowType.of(
+                new CharType(8),
+                new VarCharType(8),
+                new BooleanType(),
+                new TinyIntType(),
+                new SmallIntType(),
+                new IntType(),
+                new BigIntType(),
+                new FloatType(),
+                new DoubleType(),
+                new DecimalType(5, 2),
+                new DecimalType(38, 18),
+                new DateType());
+    }
+}
diff --git a/pom.xml b/pom.xml
index db98356f..05b83b91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@ under the License.
         <flink.streaming.java>flink-streaming-java</flink.streaming.java>
         <flink.sql.orc>flink-sql-orc</flink.sql.orc>
         <flink.orc>flink-orc</flink.orc>
+        <flink.parquet>flink-parquet</flink.parquet>
         <flink.connector.kafka>flink-connector-kafka</flink.connector.kafka>
         <flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
         <flink.test.utils>flink-test-utils</flink.test.utils>
@@ -296,6 +297,7 @@ under the License.
                 <flink.streaming.java>flink-streaming-java_${scala.binary.version}</flink.streaming.java>
                 <flink.sql.orc>flink-sql-orc_${scala.binary.version}</flink.sql.orc>
                 <flink.orc>flink-orc_${scala.binary.version}</flink.orc>
+                <flink.parquet>flink-parquet_${scala.binary.version}</flink.parquet>
                 <flink.connector.kafka>flink-connector-kafka_${scala.binary.version}</flink.connector.kafka>
                 <flink.sql.connector.kafka>flink-sql-connector-kafka_${scala.binary.version}</flink.sql.connector.kafka>
                 <flink.test.utils>flink-test-utils_${scala.binary.version}</flink.test.utils>