You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/13 12:38:38 UTC
[flink] branch master updated: [FLINK-27989][table-planner] Csv format supports reporting statistics
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dbc7ff712a1 [FLINK-27989][table-planner] Csv format supports reporting statistics
dbc7ff712a1 is described below
commit dbc7ff712a1694262629b9a349f7a1fa46240b6b
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Mon Jun 13 18:14:31 2022 +0800
[FLINK-27989][table-planner] Csv format supports reporting statistics
This closes #20007
---
.../flink/formats/csv/CsvFileFormatFactory.java | 70 +++++++++++-
.../CsvFormatFilesystemStatisticsReportTest.java | 107 ++++++++++++++++++
.../formats/csv/CsvFormatStatisticsReportTest.java | 122 +++++++++++++++++++++
.../formats/testcsv/TestCsvFormatFactory.java | 42 ++++---
4 files changed, 325 insertions(+), 16 deletions(-)
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index 8ea2280eaf1..e59ec63794f 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.formats.csv;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.BulkWriter.Factory;
import org.apache.flink.configuration.ConfigOption;
@@ -29,16 +30,21 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.common.Converter;
import org.apache.flink.formats.csv.RowDataToCsvConverters.RowDataToCsvConverter;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource.Context;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -49,7 +55,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv
import org.apache.commons.lang3.StringEscapeUtils;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
@@ -93,9 +105,12 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
return new CsvBulkDecodingFormat(formatOptions);
}
- private static class CsvBulkDecodingFormat
+ /** CsvBulkDecodingFormat which implements {@link FileBasedStatisticsReportableInputFormat}. */
+ @VisibleForTesting
+ public static class CsvBulkDecodingFormat
implements BulkDecodingFormat<RowData>,
- ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>> {
+ ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
+ FileBasedStatisticsReportableInputFormat {
private final ReadableConfig formatOptions;
@@ -136,6 +151,57 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
+
+ @Override
+ public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+ // For Csv format, it's a heavy operation to obtain accurate statistics by scanning all
+ // files. So, We obtain the estimated statistics by sampling, the specific way is to
+ // sample the first 100 lines and calculate their row size, then compare row size with
+ // total file size to get the estimated row count.
+ final int totalSampleLineCnt = 100;
+ try {
+ long totalFileSize = 0;
+ int sampledRowCnt = 0;
+ long sampledRowSize = 0;
+ for (Path file : files) {
+ FileSystem fs = FileSystem.get(file.toUri());
+ FileStatus status = fs.getFileStatus(file);
+ totalFileSize += status.getLen();
+
+ // sample the line size
+ if (sampledRowCnt < totalSampleLineCnt) {
+ try (InputStreamReader isr =
+ new InputStreamReader(
+ Files.newInputStream(
+ new File(file.toUri()).toPath()));
+ BufferedReader br = new BufferedReader(isr)) {
+ String line;
+ while (sampledRowCnt < totalSampleLineCnt
+ && (line = br.readLine()) != null) {
+ sampledRowCnt += 1;
+ sampledRowSize +=
+ (line.getBytes(StandardCharsets.UTF_8).length + 1);
+ }
+ }
+ }
+ }
+
+ // If line break is "\r\n", br.readLine() will ignore '\n' which make sampledRowSize
+ // smaller than totalFileSize. This will influence test result.
+ if (sampledRowCnt < totalSampleLineCnt) {
+ sampledRowSize = totalFileSize;
+ }
+ if (sampledRowSize == 0) {
+ return TableStats.UNKNOWN;
+ }
+
+ int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledRowCnt);
+ long estimatedRowCount = totalFileSize * realSampledLineCnt / sampledRowSize;
+ return new TableStats(estimatedRowCount);
+ } catch (Exception e) {
+ return TableStats.UNKNOWN;
+ }
+ }
}
@Override
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
new file mode 100644
index 00000000000..476319d6ce4
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for statistics functionality in {@link CsvFormatFactory} in the case of file system source.
+ */
+public class CsvFormatFilesystemStatisticsReportTest extends TableTestBase {
+ private BatchTableTestUtil util;
+ private TableEnvironment tEnv;
+ @TempDir private static File path;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ util = batchTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+ String pathName = writeData(path, Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world"));
+
+ String ddl =
+ String.format(
+ "CREATE TABLE sourceTable (\n"
+ + " a bigint,\n"
+ + " b int,\n"
+ + " c varchar\n"
+ + ") with (\n"
+ + " 'connector' = 'filesystem',"
+ + " 'format' = 'csv',"
+ + " 'path' = '%s')",
+ pathName);
+ tEnv.executeSql(ddl);
+ }
+
+ @Test
+ public void testCsvFileSystemStatisticsReport() {
+ FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from sourceTable");
+ assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
+ }
+
+ private String writeData(File path, List<String> data) throws IOException {
+ String file = path.getAbsolutePath() + "/00-00.tmp";
+ Files.write(new File(file).toPath(), String.join("\n", data).getBytes());
+ return file;
+ }
+
+ private FlinkStatistic getStatisticsFromOptimizedPlan(String sql) {
+ RelNode relNode = TableTestUtil.toRelNode(tEnv.sqlQuery(sql));
+ RelNode optimized = util.getPlanner().optimize(relNode);
+ FlinkStatisticVisitor visitor = new FlinkStatisticVisitor();
+ visitor.go(optimized);
+ return visitor.result;
+ }
+
+ private static class FlinkStatisticVisitor extends RelVisitor {
+ private FlinkStatistic result = null;
+
+ @Override
+ public void visit(RelNode node, int ordinal, RelNode parent) {
+ if (node instanceof TableScan) {
+ Preconditions.checkArgument(result == null);
+ TableSourceTable table = (TableSourceTable) node.getTable();
+ result = table.getStatistic();
+ }
+ super.visit(node, ordinal, parent);
+ }
+ }
+}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
new file mode 100644
index 00000000000..6a166e2d0b2
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for statistics functionality in {@link CsvFormatFactory}. */
+public class CsvFormatStatisticsReportTest {
+
+ private static CsvFileFormatFactory.CsvBulkDecodingFormat csvBulkDecodingFormat;
+
+ @BeforeEach
+ public void setup() {
+ Configuration configuration = new Configuration();
+ csvBulkDecodingFormat = new CsvFileFormatFactory.CsvBulkDecodingFormat(configuration);
+ }
+
+ @Test
+ public void testCsvFormatStatsReportWithSingleFile() throws IOException {
+ String fileContent =
+ "#description of the data\n"
+ + "header1|header2|header3|\n"
+ + "this is|1|2.0|\n"
+ + "//a comment\n"
+ + "a test|3|4.0|\n"
+ + "#next|5|6.0|\n";
+
+ Path tempFile = createTempFile(fileContent);
+
+ TableStats tableStats =
+ csvBulkDecodingFormat.reportStatistics(Collections.singletonList(tempFile), null);
+ assertThat(tableStats).isEqualTo(new TableStats(6));
+ }
+
+ @Test
+ public void testCsvFormatStatsReportWithMultiFile() throws IOException {
+ String fileContent1 =
+ "#description of the data\r\n"
+ + "header1|header2|header3|\r\n"
+ + "this is|1|2.0|\r\n"
+ + "//a comment\r\n"
+ + "a test|3|4.0|\r\n"
+ + "#next|5|6.0|\r\n";
+ String fileContent2 =
+ "#description of the data\r\n"
+ + "header1|header2|header3|\r\n"
+ + "this is|1|2.0|\r\n"
+ + "//a comment\r\n"
+ + "a test|3|4.0|\r\n"
+ + "#next|5|6.0|\r\n";
+ Path tempFile1 = createTempFile(fileContent1);
+ Path tempFile2 = createTempFile(fileContent2);
+ List<Path> files = new ArrayList<>();
+ files.add(tempFile1);
+ files.add(tempFile2);
+
+ TableStats tableStats = csvBulkDecodingFormat.reportStatistics(files, null);
+ assertThat(tableStats).isEqualTo(new TableStats(12));
+ }
+
+ @Test
+ public void testRowSizeBiggerThanTotalSampleLineCnt() throws IOException {
+ StringBuilder builder = new StringBuilder();
+ int lineCnt = 1000;
+ for (int i = 0; i < lineCnt; i++) {
+ builder.append("header1|header2|header3|header4|header5").append("\n");
+ }
+ Path tempFile = createTempFile(builder.toString());
+ TableStats tableStats =
+ csvBulkDecodingFormat.reportStatistics(Collections.singletonList(tempFile), null);
+ assertThat(tableStats).isEqualTo(new TableStats(lineCnt));
+ }
+
+ @Test
+ public void testCsvFormatStatsReportWithEmptyFile() {
+ TableStats tableStats = csvBulkDecodingFormat.reportStatistics(null, null);
+ assertThat(tableStats).isEqualTo(TableStats.UNKNOWN);
+ }
+
+ private static Path createTempFile(String content) throws IOException {
+ File tempFile = File.createTempFile("test_contents", "tmp");
+ tempFile.deleteOnExit();
+ OutputStreamWriter wrt =
+ new OutputStreamWriter(
+ Files.newOutputStream(tempFile.toPath()), StandardCharsets.UTF_8);
+ wrt.write(content);
+ wrt.close();
+ return new Path(tempFile.toURI().toString());
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
index 98d59d76d5f..45c64b9fced 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
@@ -42,8 +42,10 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
import org.apache.flink.table.types.DataType;
import java.io.BufferedReader;
-import java.io.FileInputStream;
+import java.io.File;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -127,36 +129,48 @@ public class TestCsvFormatFactory
@Override
public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+ // For Csv format, it's a heavy operation to obtain accurate statistics by scanning all
+ // files. So, We obtain the estimated statistics by sampling, the specific way is to
+ // sample the first 100 lines and calculate their row size, then compare row size with
+ // total file size to get the estimated row count.
final int totalSampleLineCnt = 100;
try {
- long totalSize = 0;
- int sampledLineCnt = 0;
- long sampledTotalSize = 0;
+ long totalFileSize = 0;
+ int sampledRowCnt = 0;
+ long sampledRowSize = 0;
for (Path file : files) {
FileSystem fs = FileSystem.get(file.toUri());
FileStatus status = fs.getFileStatus(file);
- totalSize += status.getLen();
+ totalFileSize += status.getLen();
// sample the line size
- if (sampledLineCnt < totalSampleLineCnt) {
+ if (sampledRowCnt < totalSampleLineCnt) {
try (InputStreamReader isr =
- new InputStreamReader(new FileInputStream(file.getPath()))) {
- BufferedReader br = new BufferedReader(isr);
+ new InputStreamReader(
+ Files.newInputStream(
+ new File(file.toUri()).toPath()));
+ BufferedReader br = new BufferedReader(isr)) {
String line;
- while (sampledLineCnt < totalSampleLineCnt
+ while (sampledRowCnt < totalSampleLineCnt
&& (line = br.readLine()) != null) {
- sampledLineCnt += 1;
- sampledTotalSize += line.length();
+ sampledRowCnt += 1;
+ sampledRowSize += line.getBytes(StandardCharsets.UTF_8).length;
}
}
}
}
- if (sampledTotalSize == 0) {
+
+ // If line break is "\r\n", br.readLine() will ignore '\n' which make sampledRowSize
+ // smaller than totalFileSize. This will influence test result.
+ if (sampledRowCnt < totalSampleLineCnt) {
+ sampledRowSize = totalFileSize;
+ }
+ if (sampledRowSize == 0) {
return TableStats.UNKNOWN;
}
- int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledLineCnt);
- int estimatedRowCount = (int) (totalSize * realSampledLineCnt / sampledTotalSize);
+ int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledRowCnt);
+ long estimatedRowCount = totalFileSize * realSampledLineCnt / sampledRowSize;
return new TableStats(estimatedRowCount);
} catch (Exception e) {
return TableStats.UNKNOWN;