You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/13 12:38:38 UTC

[flink] branch master updated: [FLINK-27989][table-planner] Csv format supports reporting statistics

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dbc7ff712a1 [FLINK-27989][table-planner] Csv format supports reporting statistics
dbc7ff712a1 is described below

commit dbc7ff712a1694262629b9a349f7a1fa46240b6b
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Mon Jun 13 18:14:31 2022 +0800

    [FLINK-27989][table-planner] Csv format supports reporting statistics
    
    This closes #20007
---
 .../flink/formats/csv/CsvFileFormatFactory.java    |  70 +++++++++++-
 .../CsvFormatFilesystemStatisticsReportTest.java   | 107 ++++++++++++++++++
 .../formats/csv/CsvFormatStatisticsReportTest.java | 122 +++++++++++++++++++++
 .../formats/testcsv/TestCsvFormatFactory.java      |  42 ++++---
 4 files changed, 325 insertions(+), 16 deletions(-)

diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index 8ea2280eaf1..e59ec63794f 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.formats.csv;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.BulkWriter.Factory;
 import org.apache.flink.configuration.ConfigOption;
@@ -29,16 +30,21 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.common.Converter;
 import org.apache.flink.formats.csv.RowDataToCsvConverters.RowDataToCsvConverter;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
 import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource.Context;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -49,7 +55,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv
 
 import org.apache.commons.lang3.StringEscapeUtils;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 
 import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
@@ -93,9 +105,12 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
         return new CsvBulkDecodingFormat(formatOptions);
     }
 
-    private static class CsvBulkDecodingFormat
+    /** CsvBulkDecodingFormat which implements {@link FileBasedStatisticsReportableInputFormat}. */
+    @VisibleForTesting
+    public static class CsvBulkDecodingFormat
             implements BulkDecodingFormat<RowData>,
-                    ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>> {
+                    ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
+                    FileBasedStatisticsReportableInputFormat {
 
         private final ReadableConfig formatOptions;
 
@@ -136,6 +151,57 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
         public ChangelogMode getChangelogMode() {
             return ChangelogMode.insertOnly();
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            // For Csv format, it's a heavy operation to obtain accurate statistics by scanning all
+            // files. So, We obtain the estimated statistics by sampling, the specific way is to
+            // sample the first 100 lines and calculate their row size, then compare row size with
+            // total file size to get the estimated row count.
+            final int totalSampleLineCnt = 100;
+            try {
+                long totalFileSize = 0;
+                int sampledRowCnt = 0;
+                long sampledRowSize = 0;
+                for (Path file : files) {
+                    FileSystem fs = FileSystem.get(file.toUri());
+                    FileStatus status = fs.getFileStatus(file);
+                    totalFileSize += status.getLen();
+
+                    // sample the line size
+                    if (sampledRowCnt < totalSampleLineCnt) {
+                        try (InputStreamReader isr =
+                                        new InputStreamReader(
+                                                Files.newInputStream(
+                                                        new File(file.toUri()).toPath()));
+                                BufferedReader br = new BufferedReader(isr)) {
+                            String line;
+                            while (sampledRowCnt < totalSampleLineCnt
+                                    && (line = br.readLine()) != null) {
+                                sampledRowCnt += 1;
+                                sampledRowSize +=
+                                        (line.getBytes(StandardCharsets.UTF_8).length + 1);
+                            }
+                        }
+                    }
+                }
+
+                // If line break is "\r\n", br.readLine() will ignore '\n' which make sampledRowSize
+                // smaller than totalFileSize. This will influence test result.
+                if (sampledRowCnt < totalSampleLineCnt) {
+                    sampledRowSize = totalFileSize;
+                }
+                if (sampledRowSize == 0) {
+                    return TableStats.UNKNOWN;
+                }
+
+                int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledRowCnt);
+                long estimatedRowCount = totalFileSize * realSampledLineCnt / sampledRowSize;
+                return new TableStats(estimatedRowCount);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
     }
 
     @Override
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
new file mode 100644
index 00000000000..476319d6ce4
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for statistics functionality in {@link CsvFormatFactory} in the case of file system source.
+ */
+public class CsvFormatFilesystemStatisticsReportTest extends TableTestBase {
+    private BatchTableTestUtil util;
+    private TableEnvironment tEnv;
+    @TempDir private static File path;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        util = batchTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+        String pathName = writeData(path, Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world"));
+
+        String ddl =
+                String.format(
+                        "CREATE TABLE sourceTable (\n"
+                                + "  a bigint,\n"
+                                + "  b int,\n"
+                                + "  c varchar\n"
+                                + ") with (\n"
+                                + " 'connector' = 'filesystem',"
+                                + " 'format' = 'csv',"
+                                + " 'path' = '%s')",
+                        pathName);
+        tEnv.executeSql(ddl);
+    }
+
+    @Test
+    public void testCsvFileSystemStatisticsReport() {
+        FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from sourceTable");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
+    }
+
+    private String writeData(File path, List<String> data) throws IOException {
+        String file = path.getAbsolutePath() + "/00-00.tmp";
+        Files.write(new File(file).toPath(), String.join("\n", data).getBytes());
+        return file;
+    }
+
+    private FlinkStatistic getStatisticsFromOptimizedPlan(String sql) {
+        RelNode relNode = TableTestUtil.toRelNode(tEnv.sqlQuery(sql));
+        RelNode optimized = util.getPlanner().optimize(relNode);
+        FlinkStatisticVisitor visitor = new FlinkStatisticVisitor();
+        visitor.go(optimized);
+        return visitor.result;
+    }
+
+    private static class FlinkStatisticVisitor extends RelVisitor {
+        private FlinkStatistic result = null;
+
+        @Override
+        public void visit(RelNode node, int ordinal, RelNode parent) {
+            if (node instanceof TableScan) {
+                Preconditions.checkArgument(result == null);
+                TableSourceTable table = (TableSourceTable) node.getTable();
+                result = table.getStatistic();
+            }
+            super.visit(node, ordinal, parent);
+        }
+    }
+}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
new file mode 100644
index 00000000000..6a166e2d0b2
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for statistics functionality in {@link CsvFormatFactory}. */
+public class CsvFormatStatisticsReportTest {
+
+    private static CsvFileFormatFactory.CsvBulkDecodingFormat csvBulkDecodingFormat;
+
+    @BeforeEach
+    public void setup() {
+        Configuration configuration = new Configuration();
+        csvBulkDecodingFormat = new CsvFileFormatFactory.CsvBulkDecodingFormat(configuration);
+    }
+
+    @Test
+    public void testCsvFormatStatsReportWithSingleFile() throws IOException {
+        String fileContent =
+                "#description of the data\n"
+                        + "header1|header2|header3|\n"
+                        + "this is|1|2.0|\n"
+                        + "//a comment\n"
+                        + "a test|3|4.0|\n"
+                        + "#next|5|6.0|\n";
+
+        Path tempFile = createTempFile(fileContent);
+
+        TableStats tableStats =
+                csvBulkDecodingFormat.reportStatistics(Collections.singletonList(tempFile), null);
+        assertThat(tableStats).isEqualTo(new TableStats(6));
+    }
+
+    @Test
+    public void testCsvFormatStatsReportWithMultiFile() throws IOException {
+        String fileContent1 =
+                "#description of the data\r\n"
+                        + "header1|header2|header3|\r\n"
+                        + "this is|1|2.0|\r\n"
+                        + "//a comment\r\n"
+                        + "a test|3|4.0|\r\n"
+                        + "#next|5|6.0|\r\n";
+        String fileContent2 =
+                "#description of the data\r\n"
+                        + "header1|header2|header3|\r\n"
+                        + "this is|1|2.0|\r\n"
+                        + "//a comment\r\n"
+                        + "a test|3|4.0|\r\n"
+                        + "#next|5|6.0|\r\n";
+        Path tempFile1 = createTempFile(fileContent1);
+        Path tempFile2 = createTempFile(fileContent2);
+        List<Path> files = new ArrayList<>();
+        files.add(tempFile1);
+        files.add(tempFile2);
+
+        TableStats tableStats = csvBulkDecodingFormat.reportStatistics(files, null);
+        assertThat(tableStats).isEqualTo(new TableStats(12));
+    }
+
+    @Test
+    public void testRowSizeBiggerThanTotalSampleLineCnt() throws IOException {
+        StringBuilder builder = new StringBuilder();
+        int lineCnt = 1000;
+        for (int i = 0; i < lineCnt; i++) {
+            builder.append("header1|header2|header3|header4|header5").append("\n");
+        }
+        Path tempFile = createTempFile(builder.toString());
+        TableStats tableStats =
+                csvBulkDecodingFormat.reportStatistics(Collections.singletonList(tempFile), null);
+        assertThat(tableStats).isEqualTo(new TableStats(lineCnt));
+    }
+
+    @Test
+    public void testCsvFormatStatsReportWithEmptyFile() {
+        TableStats tableStats = csvBulkDecodingFormat.reportStatistics(null, null);
+        assertThat(tableStats).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    private static Path createTempFile(String content) throws IOException {
+        File tempFile = File.createTempFile("test_contents", "tmp");
+        tempFile.deleteOnExit();
+        OutputStreamWriter wrt =
+                new OutputStreamWriter(
+                        Files.newOutputStream(tempFile.toPath()), StandardCharsets.UTF_8);
+        wrt.write(content);
+        wrt.close();
+        return new Path(tempFile.toURI().toString());
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
index 98d59d76d5f..45c64b9fced 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
@@ -42,8 +42,10 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
 import org.apache.flink.table.types.DataType;
 
 import java.io.BufferedReader;
-import java.io.FileInputStream;
+import java.io.File;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -127,36 +129,48 @@ public class TestCsvFormatFactory
 
         @Override
         public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            // For Csv format, it's a heavy operation to obtain accurate statistics by scanning all
+            // files. So, We obtain the estimated statistics by sampling, the specific way is to
+            // sample the first 100 lines and calculate their row size, then compare row size with
+            // total file size to get the estimated row count.
             final int totalSampleLineCnt = 100;
             try {
-                long totalSize = 0;
-                int sampledLineCnt = 0;
-                long sampledTotalSize = 0;
+                long totalFileSize = 0;
+                int sampledRowCnt = 0;
+                long sampledRowSize = 0;
                 for (Path file : files) {
                     FileSystem fs = FileSystem.get(file.toUri());
                     FileStatus status = fs.getFileStatus(file);
-                    totalSize += status.getLen();
+                    totalFileSize += status.getLen();
 
                     // sample the line size
-                    if (sampledLineCnt < totalSampleLineCnt) {
+                    if (sampledRowCnt < totalSampleLineCnt) {
                         try (InputStreamReader isr =
-                                new InputStreamReader(new FileInputStream(file.getPath()))) {
-                            BufferedReader br = new BufferedReader(isr);
+                                        new InputStreamReader(
+                                                Files.newInputStream(
+                                                        new File(file.toUri()).toPath()));
+                                BufferedReader br = new BufferedReader(isr)) {
                             String line;
-                            while (sampledLineCnt < totalSampleLineCnt
+                            while (sampledRowCnt < totalSampleLineCnt
                                     && (line = br.readLine()) != null) {
-                                sampledLineCnt += 1;
-                                sampledTotalSize += line.length();
+                                sampledRowCnt += 1;
+                                sampledRowSize += line.getBytes(StandardCharsets.UTF_8).length;
                             }
                         }
                     }
                 }
-                if (sampledTotalSize == 0) {
+
+                // If line break is "\r\n", br.readLine() will ignore '\n' which make sampledRowSize
+                // smaller than totalFileSize. This will influence test result.
+                if (sampledRowCnt < totalSampleLineCnt) {
+                    sampledRowSize = totalFileSize;
+                }
+                if (sampledRowSize == 0) {
                     return TableStats.UNKNOWN;
                 }
 
-                int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledLineCnt);
-                int estimatedRowCount = (int) (totalSize * realSampledLineCnt / sampledTotalSize);
+                int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledRowCnt);
+                long estimatedRowCount = totalFileSize * realSampledLineCnt / sampledRowSize;
                 return new TableStats(estimatedRowCount);
             } catch (Exception e) {
                 return TableStats.UNKNOWN;