You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/15 06:06:43 UTC

[flink] branch master updated: [FLINK-27991][table-planner] ORC format supports reporting statistics

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a521e82a473 [FLINK-27991][table-planner] ORC format supports reporting statistics
a521e82a473 is described below

commit a521e82a47384ad88e2424f9f6734f0c6d1f9b14
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Wed Jul 13 14:59:20 2022 +0800

    [FLINK-27991][table-planner] ORC format supports reporting statistics
    
    This closes #20009
---
 flink-formats/flink-orc/pom.xml                    |   8 +
 .../org/apache/flink/orc/OrcFileFormatFactory.java | 202 +++++++++++++++++-
 .../orc/OrcFileSystemStatisticsReportTest.java     |  51 +++++
 .../flink/orc/OrcFormatStatisticsReportTest.java   | 236 +++++++++++++++++++++
 .../planner/utils/StatisticsReportTestBase.java    |   4 +-
 5 files changed, 497 insertions(+), 4 deletions(-)

diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index 050ce79cab4..6aa5cceb4f7 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -109,6 +109,14 @@ under the License.
 
 		<!-- Tests -->
 
+		<!--Support OrcFileSystemStatisticsReport using calcite verify plan methods, see FLINK-27991 -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 1e0bf4249d5..02057fd66be 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.orc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
@@ -26,12 +27,14 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
 import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -40,17 +43,34 @@ import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ColumnStatisticsImpl;
 
+import java.io.IOException;
+import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
@@ -115,9 +135,12 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
         };
     }
 
-    private static class OrcBulkDecodingFormat
+    /** OrcBulkDecodingFormat which implements {@link FileBasedStatisticsReportableInputFormat}. */
+    @VisibleForTesting
+    public static class OrcBulkDecodingFormat
             implements BulkDecodingFormat<RowData>,
-                    ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>> {
+                    ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
+                    FileBasedStatisticsReportableInputFormat {
 
         private final ReadableConfig formatOptions;
         private List<ResolvedExpression> filters;
@@ -167,5 +190,180 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics columnStatistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - columnStatistics.getNumberOfValues());
+            }
+
+            // For complex types: ROW, ARRAY, MAP. The returned statistics have wrong null count
+            // value, so now complex types stats return null.
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (columnStatistics instanceof IntegerColumnStatistics) {
+                        builder.setMax(((IntegerColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((IntegerColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                case DOUBLE:
+                    if (columnStatistics instanceof DoubleColumnStatistics) {
+                        builder.setMax(((DoubleColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((DoubleColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                    if (columnStatistics instanceof StringColumnStatistics) {
+                        builder.setMax(((StringColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((StringColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DATE:
+                    if (columnStatistics instanceof DateColumnStatistics) {
+                        Date maximum =
+                                (Date) ((DateColumnStatistics) columnStatistics).getMaximum();
+                        Date minimum =
+                                (Date) ((DateColumnStatistics) columnStatistics).getMinimum();
+                        builder.setMax(maximum).setMin(minimum);
+                        break;
+                    } else {
+                        return null;
+                    }
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_TIME_ZONE:
+                    if (columnStatistics instanceof TimestampColumnStatistics) {
+                        builder.setMax(((TimestampColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(
+                                        ((TimestampColumnStatistics) columnStatistics)
+                                                .getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DECIMAL:
+                    if (columnStatistics instanceof DecimalColumnStatistics) {
+                        builder.setMax(
+                                        ((DecimalColumnStatistics) columnStatistics)
+                                                .getMaximum()
+                                                .bigDecimalValue())
+                                .setMin(
+                                        ((DecimalColumnStatistics) columnStatistics)
+                                                .getMinimum()
+                                                .bigDecimalValue());
+                        break;
+                    } else {
+                        return null;
+                    }
+                default:
+                    return null;
+            }
+            return builder.build();
+        }
     }
 }
diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemStatisticsReportTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemStatisticsReportTest.java
new file mode 100644
index 00000000000..57b723beb13
--- /dev/null
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemStatisticsReportTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for statistics functionality in {@link OrcFileFormatFactory} in the case of file system
+ * source.
+ */
+public class OrcFileSystemStatisticsReportTest extends OrcFormatStatisticsReportTest {
+
+    @BeforeEach
+    public void setup(@TempDir File file) throws Exception {
+        super.setup(file);
+    }
+
+    @Test
+    public void testOrcFileSystemStatisticsReportWithSingleFile()
+            throws ExecutionException, InterruptedException {
+        // insert data and get statistics by get plan.
+        DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
+        tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+        FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from sourceTable");
+        assertOrcFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+}
diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
new file mode 100644
index 00000000000..6262f22e714
--- /dev/null
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.utils.StatisticsReportTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for statistics functionality in {@link OrcFileFormatFactory} which storage format is orc.
+ */
+public class OrcFormatStatisticsReportTest extends StatisticsReportTestBase {
+
+    private static OrcFileFormatFactory.OrcBulkDecodingFormat orcBulkDecodingFormat;
+
+    @BeforeEach
+    public void setup(@TempDir File file) throws Exception {
+        super.setup(file);
+        createFileSystemSource();
+        Configuration configuration = new Configuration();
+        orcBulkDecodingFormat = new OrcFileFormatFactory.OrcBulkDecodingFormat(configuration);
+    }
+
+    @Override
+    protected String[] properties() {
+        List<String> ret = new ArrayList<>();
+        ret.add("'format'='orc'");
+        ret.add("'orc.compress'='snappy'");
+        return ret.toArray(new String[0]);
+    }
+
+    @Test
+    public void testOrcFormatStatsReportWithSingleFile() throws Exception {
+        // insert data and get statistics.
+        DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
+        tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+        assertThat(folder.listFiles()).isNotNull().hasSize(1);
+        File[] files = folder.listFiles();
+        assert files != null;
+        TableStats tableStats =
+                orcBulkDecodingFormat.reportStatistics(
+                        Collections.singletonList(new Path(files[0].toURI().toString())), dataType);
+        assertOrcFormatTableStatsEquals(tableStats, 3, 1L);
+    }
+
+    @Test
+    public void testOrcFormatStatsReportWithMultiFile() throws Exception {
+        // insert data and get statistics.
+        DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
+        tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+        tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+        assertThat(folder.listFiles()).isNotNull().hasSize(2);
+        File[] files = folder.listFiles();
+        List<Path> paths = new ArrayList<>();
+        assert files != null;
+        paths.add(new Path(files[0].toURI().toString()));
+        paths.add(new Path(files[1].toURI().toString()));
+        TableStats tableStats = orcBulkDecodingFormat.reportStatistics(paths, dataType);
+        assertOrcFormatTableStatsEquals(tableStats, 6, 2L);
+    }
+
+    @Test
+    public void testOrcFormatStatsReportWithEmptyFile() {
+        TableStats tableStats = orcBulkDecodingFormat.reportStatistics(null, null);
+        assertThat(tableStats).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    @Override
+    protected Map<String, String> ddlTypesMap() {
+        // now orc format don't support TIME(), BINARY(), VARBINARY() and
+        // TIMESTAMP_WITH_LOCAL_TIME_ZONE types, so we remove these types.
+        Map<String, String> ddlTypes = super.ddlTypesMap();
+        ddlTypes.remove("timestamp with local time zone");
+        ddlTypes.remove("binary(1)");
+        ddlTypes.remove("varbinary(1)");
+        ddlTypes.remove("time");
+        return ddlTypes;
+    }
+
+    @Override
+    protected Map<String, List<Object>> getDataMap() {
+        // now orc format don't support TIME(), BINARY(), VARBINARY() and
+        // TIMESTAMP_WITH_LOCAL_TIME_ZONE types, so we remove data belong to these types.
+        Map<String, List<Object>> dataMap = super.getDataMap();
+        dataMap.remove("timestamp with local time zone");
+        dataMap.remove("binary(1)");
+        dataMap.remove("varbinary(1)");
+        dataMap.remove("time");
+
+        return dataMap;
+    }
+
+    protected static void assertOrcFormatTableStatsEquals(
+            TableStats tableStats, int expectedRowCount, long nullCount) {
+        Map<String, ColumnStats> expectedColumnStatsMap = new HashMap<>();
+        expectedColumnStatsMap.put(
+                "f_boolean", new ColumnStats.Builder().setNullCount(nullCount).build());
+        expectedColumnStatsMap.put(
+                "f_tinyint",
+                new ColumnStats.Builder().setMax(3L).setMin(1L).setNullCount(0L).build());
+        expectedColumnStatsMap.put(
+                "f_smallint",
+                new ColumnStats.Builder().setMax(128L).setMin(100L).setNullCount(0L).build());
+        expectedColumnStatsMap.put(
+                "f_int",
+                new ColumnStats.Builder()
+                        .setMax(45536L)
+                        .setMin(31000L)
+                        .setNullCount(nullCount)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_bigint",
+                new ColumnStats.Builder()
+                        .setMax(1238123899121L)
+                        .setMin(1238123899000L)
+                        .setNullCount(0L)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_float",
+                new ColumnStats.Builder()
+                        .setMax(33.33300018310547D)
+                        .setMin(33.31100082397461D)
+                        .setNullCount(nullCount)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_double",
+                new ColumnStats.Builder().setMax(10.1D).setMin(1.1D).setNullCount(0L).build());
+        expectedColumnStatsMap.put(
+                "f_string",
+                new ColumnStats.Builder().setMax("def").setMin("abcd").setNullCount(0L).build());
+        expectedColumnStatsMap.put(
+                "f_decimal5",
+                new ColumnStats.Builder()
+                        .setMax(new BigDecimal("223.45"))
+                        .setMin(new BigDecimal("123.45"))
+                        .setNullCount(0L)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_decimal14",
+                new ColumnStats.Builder()
+                        .setMax(new BigDecimal("123333333355.33"))
+                        .setMin(new BigDecimal("123333333333.33"))
+                        .setNullCount(0L)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_decimal38",
+                new ColumnStats.Builder()
+                        .setMax(new BigDecimal("123433343334333433343334333433343334.34"))
+                        .setMin(new BigDecimal("123433343334333433343334333433343334.33"))
+                        .setNullCount(nullCount)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_date",
+                new ColumnStats.Builder()
+                        .setMax(Date.valueOf("1990-10-16"))
+                        .setMin(Date.valueOf("1990-10-14"))
+                        .setNullCount(0L)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_timestamp3",
+                new ColumnStats.Builder()
+                        .setMax(
+                                DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3)
+                                        .toTimestamp())
+                        .setMin(
+                                DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3)
+                                        .toTimestamp())
+                        .setNullCount(0L)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_timestamp9",
+                new ColumnStats.Builder()
+                        .setMax(
+                                DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3)
+                                        .toTimestamp())
+                        .setMin(
+                                DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3)
+                                        .toTimestamp())
+                        .setNullCount(0L)
+                        .build());
+        expectedColumnStatsMap.put(
+                "f_timestamp_wtz",
+                new ColumnStats.Builder()
+                        .setMax(
+                                DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3)
+                                        .toTimestamp())
+                        .setMin(
+                                DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3)
+                                        .toTimestamp())
+                        .setNullCount(0L)
+                        .build());
+
+        // For complex types: ROW, ARRAY, MAP. The returned statistics have wrong null count
+        // value, so now complex types stats return null.
+        expectedColumnStatsMap.put("f_row", null);
+        expectedColumnStatsMap.put("f_array", null);
+        expectedColumnStatsMap.put("f_map", null);
+
+        assertThat(tableStats).isEqualTo(new TableStats(expectedRowCount, expectedColumnStatsMap));
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java
index f0ceaa16038..764d9689242 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java
@@ -85,7 +85,7 @@ public abstract class StatisticsReportTestBase extends TestLogger {
 
     protected abstract String[] properties();
 
-    private Map<String, String> ddlTypesMap() {
+    protected Map<String, String> ddlTypesMap() {
         Map<String, String> ddlTypesMap = new LinkedHashMap<>();
         ddlTypesMap.put("boolean", "f_boolean");
         ddlTypesMap.put("tinyint", "f_tinyint");
@@ -113,7 +113,7 @@ public abstract class StatisticsReportTestBase extends TestLogger {
         return ddlTypesMap;
     }
 
-    private Map<String, List<Object>> getDataMap() {
+    protected Map<String, List<Object>> getDataMap() {
         Map<String, List<Object>> dataMap = new LinkedHashMap<>();
         dataMap.put("boolean", Stream.of(null, true, false).collect(toList()));
         dataMap.put("tinyint", Stream.of((byte) 1, (byte) 2, (byte) 3).collect(toList()));