You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/15 06:06:43 UTC
[flink] branch master updated: [FLINK-27991][table-planner] ORC format supports reporting statistics
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a521e82a473 [FLINK-27991][table-planner] ORC format supports reporting statistics
a521e82a473 is described below
commit a521e82a47384ad88e2424f9f6734f0c6d1f9b14
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Wed Jul 13 14:59:20 2022 +0800
[FLINK-27991][table-planner] ORC format supports reporting statistics
This closes #20009
---
flink-formats/flink-orc/pom.xml | 8 +
.../org/apache/flink/orc/OrcFileFormatFactory.java | 202 +++++++++++++++++-
.../orc/OrcFileSystemStatisticsReportTest.java | 51 +++++
.../flink/orc/OrcFormatStatisticsReportTest.java | 236 +++++++++++++++++++++
.../planner/utils/StatisticsReportTestBase.java | 4 +-
5 files changed, 497 insertions(+), 4 deletions(-)
diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index 050ce79cab4..6aa5cceb4f7 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -109,6 +109,14 @@ under the License.
<!-- Tests -->
+ <!--Support OrcFileSystemStatisticsReport using calcite verify plan methods, see FLINK-27991 -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 1e0bf4249d5..02057fd66be 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.orc;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
@@ -26,12 +27,14 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -40,17 +43,34 @@ import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import java.io.IOException;
+import java.sql.Date;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -115,9 +135,12 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
};
}
- private static class OrcBulkDecodingFormat
+ /** OrcBulkDecodingFormat which implements {@link FileBasedStatisticsReportableInputFormat}. */
+ @VisibleForTesting
+ public static class OrcBulkDecodingFormat
implements BulkDecodingFormat<RowData>,
- ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>> {
+ ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
+ FileBasedStatisticsReportableInputFormat {
private final ReadableConfig formatOptions;
private List<ResolvedExpression> filters;
@@ -167,5 +190,180 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
public void applyFilters(List<ResolvedExpression> filters) {
this.filters = filters;
}
+
+ @Override
+ public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+ try {
+ Properties properties = getOrcProperties(formatOptions);
+ Configuration hadoopConfig = new Configuration();
+ properties.forEach((k, v) -> hadoopConfig.set(k.toString(), v.toString()));
+
+ long rowCount = 0;
+ Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
+ RowType producedRowType = (RowType) producedDataType.getLogicalType();
+ for (Path file : files) {
+ rowCount +=
+ updateStatistics(
+ hadoopConfig, file, columnStatisticsMap, producedRowType);
+ }
+
+ Map<String, ColumnStats> columnStatsMap =
+ convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
+
+ return new TableStats(rowCount, columnStatsMap);
+ } catch (Exception e) {
+ return TableStats.UNKNOWN;
+ }
+ }
+
+ private long updateStatistics(
+ Configuration hadoopConf,
+ Path file,
+ Map<String, ColumnStatistics> columnStatisticsMap,
+ RowType producedRowType)
+ throws IOException {
+ org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+ Reader reader =
+ OrcFile.createReader(
+ path,
+ OrcFile.readerOptions(hadoopConf)
+ .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+ ColumnStatistics[] statistics = reader.getStatistics();
+ TypeDescription schema = reader.getSchema();
+ List<String> fieldNames = schema.getFieldNames();
+ List<TypeDescription> columnTypes = schema.getChildren();
+ for (String column : producedRowType.getFieldNames()) {
+ int fieldIdx = fieldNames.indexOf(column);
+ if (fieldIdx >= 0) {
+ int colId = columnTypes.get(fieldIdx).getId();
+ ColumnStatistics statistic = statistics[colId];
+ updateStatistics(statistic, column, columnStatisticsMap);
+ }
+ }
+
+ return reader.getNumberOfRows();
+ }
+
+ private void updateStatistics(
+ ColumnStatistics statistic,
+ String column,
+ Map<String, ColumnStatistics> columnStatisticsMap) {
+ ColumnStatistics previousStatistics = columnStatisticsMap.get(column);
+ if (previousStatistics == null) {
+ columnStatisticsMap.put(column, statistic);
+ } else {
+ if (previousStatistics instanceof ColumnStatisticsImpl) {
+ ((ColumnStatisticsImpl) previousStatistics)
+ .merge((ColumnStatisticsImpl) statistic);
+ }
+ }
+ }
+
+ private Map<String, ColumnStats> convertToColumnStats(
+ long totalRowCount,
+ Map<String, ColumnStatistics> columnStatisticsMap,
+ RowType logicalType) {
+ Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+ for (String column : logicalType.getFieldNames()) {
+ ColumnStatistics columnStatistics = columnStatisticsMap.get(column);
+ if (columnStatistics == null) {
+ continue;
+ }
+ ColumnStats columnStats =
+ convertToColumnStats(
+ totalRowCount,
+ logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+ columnStatistics);
+ columnStatsMap.put(column, columnStats);
+ }
+
+ return columnStatsMap;
+ }
+
+ private ColumnStats convertToColumnStats(
+ long totalRowCount, LogicalType logicalType, ColumnStatistics columnStatistics) {
+ ColumnStats.Builder builder =
+ new ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+ if (!columnStatistics.hasNull()) {
+ builder.setNullCount(0L);
+ } else {
+ builder.setNullCount(totalRowCount - columnStatistics.getNumberOfValues());
+ }
+
+ // For complex types: ROW, ARRAY, MAP. The returned statistics have wrong null count
+ // value, so now complex types stats return null.
+ switch (logicalType.getTypeRoot()) {
+ case BOOLEAN:
+ break;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ if (columnStatistics instanceof IntegerColumnStatistics) {
+ builder.setMax(((IntegerColumnStatistics) columnStatistics).getMaximum())
+ .setMin(((IntegerColumnStatistics) columnStatistics).getMinimum());
+ break;
+ } else {
+ return null;
+ }
+ case FLOAT:
+ case DOUBLE:
+ if (columnStatistics instanceof DoubleColumnStatistics) {
+ builder.setMax(((DoubleColumnStatistics) columnStatistics).getMaximum())
+ .setMin(((DoubleColumnStatistics) columnStatistics).getMinimum());
+ break;
+ } else {
+ return null;
+ }
+ case CHAR:
+ case VARCHAR:
+ if (columnStatistics instanceof StringColumnStatistics) {
+ builder.setMax(((StringColumnStatistics) columnStatistics).getMaximum())
+ .setMin(((StringColumnStatistics) columnStatistics).getMinimum());
+ break;
+ } else {
+ return null;
+ }
+ case DATE:
+ if (columnStatistics instanceof DateColumnStatistics) {
+ Date maximum =
+ (Date) ((DateColumnStatistics) columnStatistics).getMaximum();
+ Date minimum =
+ (Date) ((DateColumnStatistics) columnStatistics).getMinimum();
+ builder.setMax(maximum).setMin(minimum);
+ break;
+ } else {
+ return null;
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ if (columnStatistics instanceof TimestampColumnStatistics) {
+ builder.setMax(((TimestampColumnStatistics) columnStatistics).getMaximum())
+ .setMin(
+ ((TimestampColumnStatistics) columnStatistics)
+ .getMinimum());
+ break;
+ } else {
+ return null;
+ }
+ case DECIMAL:
+ if (columnStatistics instanceof DecimalColumnStatistics) {
+ builder.setMax(
+ ((DecimalColumnStatistics) columnStatistics)
+ .getMaximum()
+ .bigDecimalValue())
+ .setMin(
+ ((DecimalColumnStatistics) columnStatistics)
+ .getMinimum()
+ .bigDecimalValue());
+ break;
+ } else {
+ return null;
+ }
+ default:
+ return null;
+ }
+ return builder.build();
+ }
}
}
diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemStatisticsReportTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemStatisticsReportTest.java
new file mode 100644
index 00000000000..57b723beb13
--- /dev/null
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemStatisticsReportTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for statistics functionality in {@link OrcFileFormatFactory} in the case of file system
+ * source.
+ */
+public class OrcFileSystemStatisticsReportTest extends OrcFormatStatisticsReportTest {
+
+ @BeforeEach
+ public void setup(@TempDir File file) throws Exception {
+ super.setup(file);
+ }
+
+ @Test
+ public void testOrcFileSystemStatisticsReportWithSingleFile()
+ throws ExecutionException, InterruptedException {
+ // insert data and get statistics by get plan.
+ DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
+ tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+ FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from sourceTable");
+ assertOrcFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+ }
+}
diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
new file mode 100644
index 00000000000..6262f22e714
--- /dev/null
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.utils.StatisticsReportTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for statistics functionality in {@link OrcFileFormatFactory} which storage format is orc.
+ */
+public class OrcFormatStatisticsReportTest extends StatisticsReportTestBase {
+
+ private static OrcFileFormatFactory.OrcBulkDecodingFormat orcBulkDecodingFormat;
+
+ @BeforeEach
+ public void setup(@TempDir File file) throws Exception {
+ super.setup(file);
+ createFileSystemSource();
+ Configuration configuration = new Configuration();
+ orcBulkDecodingFormat = new OrcFileFormatFactory.OrcBulkDecodingFormat(configuration);
+ }
+
+ @Override
+ protected String[] properties() {
+ List<String> ret = new ArrayList<>();
+ ret.add("'format'='orc'");
+ ret.add("'orc.compress'='snappy'");
+ return ret.toArray(new String[0]);
+ }
+
+ @Test
+ public void testOrcFormatStatsReportWithSingleFile() throws Exception {
+ // insert data and get statistics.
+ DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
+ tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+ assertThat(folder.listFiles()).isNotNull().hasSize(1);
+ File[] files = folder.listFiles();
+ assert files != null;
+ TableStats tableStats =
+ orcBulkDecodingFormat.reportStatistics(
+ Collections.singletonList(new Path(files[0].toURI().toString())), dataType);
+ assertOrcFormatTableStatsEquals(tableStats, 3, 1L);
+ }
+
+ @Test
+ public void testOrcFormatStatsReportWithMultiFile() throws Exception {
+ // insert data and get statistics.
+ DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
+ tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+ tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
+ assertThat(folder.listFiles()).isNotNull().hasSize(2);
+ File[] files = folder.listFiles();
+ List<Path> paths = new ArrayList<>();
+ assert files != null;
+ paths.add(new Path(files[0].toURI().toString()));
+ paths.add(new Path(files[1].toURI().toString()));
+ TableStats tableStats = orcBulkDecodingFormat.reportStatistics(paths, dataType);
+ assertOrcFormatTableStatsEquals(tableStats, 6, 2L);
+ }
+
+ @Test
+ public void testOrcFormatStatsReportWithEmptyFile() {
+ TableStats tableStats = orcBulkDecodingFormat.reportStatistics(null, null);
+ assertThat(tableStats).isEqualTo(TableStats.UNKNOWN);
+ }
+
+ @Override
+ protected Map<String, String> ddlTypesMap() {
+ // now orc format don't support TIME(), BINARY(), VARBINARY() and
+ // TIMESTAMP_WITH_LOCAL_TIME_ZONE types, so we remove these types.
+ Map<String, String> ddlTypes = super.ddlTypesMap();
+ ddlTypes.remove("timestamp with local time zone");
+ ddlTypes.remove("binary(1)");
+ ddlTypes.remove("varbinary(1)");
+ ddlTypes.remove("time");
+ return ddlTypes;
+ }
+
+ @Override
+ protected Map<String, List<Object>> getDataMap() {
+ // now orc format don't support TIME(), BINARY(), VARBINARY() and
+ // TIMESTAMP_WITH_LOCAL_TIME_ZONE types, so we remove data belong to these types.
+ Map<String, List<Object>> dataMap = super.getDataMap();
+ dataMap.remove("timestamp with local time zone");
+ dataMap.remove("binary(1)");
+ dataMap.remove("varbinary(1)");
+ dataMap.remove("time");
+
+ return dataMap;
+ }
+
+ protected static void assertOrcFormatTableStatsEquals(
+ TableStats tableStats, int expectedRowCount, long nullCount) {
+ Map<String, ColumnStats> expectedColumnStatsMap = new HashMap<>();
+ expectedColumnStatsMap.put(
+ "f_boolean", new ColumnStats.Builder().setNullCount(nullCount).build());
+ expectedColumnStatsMap.put(
+ "f_tinyint",
+ new ColumnStats.Builder().setMax(3L).setMin(1L).setNullCount(0L).build());
+ expectedColumnStatsMap.put(
+ "f_smallint",
+ new ColumnStats.Builder().setMax(128L).setMin(100L).setNullCount(0L).build());
+ expectedColumnStatsMap.put(
+ "f_int",
+ new ColumnStats.Builder()
+ .setMax(45536L)
+ .setMin(31000L)
+ .setNullCount(nullCount)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_bigint",
+ new ColumnStats.Builder()
+ .setMax(1238123899121L)
+ .setMin(1238123899000L)
+ .setNullCount(0L)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_float",
+ new ColumnStats.Builder()
+ .setMax(33.33300018310547D)
+ .setMin(33.31100082397461D)
+ .setNullCount(nullCount)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_double",
+ new ColumnStats.Builder().setMax(10.1D).setMin(1.1D).setNullCount(0L).build());
+ expectedColumnStatsMap.put(
+ "f_string",
+ new ColumnStats.Builder().setMax("def").setMin("abcd").setNullCount(0L).build());
+ expectedColumnStatsMap.put(
+ "f_decimal5",
+ new ColumnStats.Builder()
+ .setMax(new BigDecimal("223.45"))
+ .setMin(new BigDecimal("123.45"))
+ .setNullCount(0L)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_decimal14",
+ new ColumnStats.Builder()
+ .setMax(new BigDecimal("123333333355.33"))
+ .setMin(new BigDecimal("123333333333.33"))
+ .setNullCount(0L)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_decimal38",
+ new ColumnStats.Builder()
+ .setMax(new BigDecimal("123433343334333433343334333433343334.34"))
+ .setMin(new BigDecimal("123433343334333433343334333433343334.33"))
+ .setNullCount(nullCount)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_date",
+ new ColumnStats.Builder()
+ .setMax(Date.valueOf("1990-10-16"))
+ .setMin(Date.valueOf("1990-10-14"))
+ .setNullCount(0L)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_timestamp3",
+ new ColumnStats.Builder()
+ .setMax(
+ DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3)
+ .toTimestamp())
+ .setMin(
+ DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3)
+ .toTimestamp())
+ .setNullCount(0L)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_timestamp9",
+ new ColumnStats.Builder()
+ .setMax(
+ DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3)
+ .toTimestamp())
+ .setMin(
+ DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3)
+ .toTimestamp())
+ .setNullCount(0L)
+ .build());
+ expectedColumnStatsMap.put(
+ "f_timestamp_wtz",
+ new ColumnStats.Builder()
+ .setMax(
+ DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3)
+ .toTimestamp())
+ .setMin(
+ DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3)
+ .toTimestamp())
+ .setNullCount(0L)
+ .build());
+
+ // For complex types: ROW, ARRAY, MAP. The returned statistics have wrong null count
+ // value, so now complex types stats return null.
+ expectedColumnStatsMap.put("f_row", null);
+ expectedColumnStatsMap.put("f_array", null);
+ expectedColumnStatsMap.put("f_map", null);
+
+ assertThat(tableStats).isEqualTo(new TableStats(expectedRowCount, expectedColumnStatsMap));
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java
index f0ceaa16038..764d9689242 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StatisticsReportTestBase.java
@@ -85,7 +85,7 @@ public abstract class StatisticsReportTestBase extends TestLogger {
protected abstract String[] properties();
- private Map<String, String> ddlTypesMap() {
+ protected Map<String, String> ddlTypesMap() {
Map<String, String> ddlTypesMap = new LinkedHashMap<>();
ddlTypesMap.put("boolean", "f_boolean");
ddlTypesMap.put("tinyint", "f_tinyint");
@@ -113,7 +113,7 @@ public abstract class StatisticsReportTestBase extends TestLogger {
return ddlTypesMap;
}
- private Map<String, List<Object>> getDataMap() {
+ protected Map<String, List<Object>> getDataMap() {
Map<String, List<Object>> dataMap = new LinkedHashMap<>();
dataMap.put("boolean", Stream.of(null, true, false).collect(toList()));
dataMap.put("tinyint", Stream.of((byte) 1, (byte) 2, (byte) 3).collect(toList()));