You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/27 13:13:37 UTC

[GitHub] [flink] godfreyhe commented on a diff in pull request #20009: [FLINK-27991][table-planner] ORC format supports reporting statistics

godfreyhe commented on code in PR #20009:
URL: https://github.com/apache/flink/pull/20009#discussion_r907370399


##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +188,179 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics columnStatistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                    builder.setMax(null).setMin(null);
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (columnStatistics instanceof IntegerColumnStatistics) {
+                        builder.setMax(((IntegerColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((IntegerColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                case DOUBLE:
+                    if (columnStatistics instanceof DoubleColumnStatistics) {
+                        builder.setMax(((DoubleColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((DoubleColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                case VARBINARY:

Review Comment:
   no max, min for VARBINARY type



##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +188,179 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics columnStatistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                    builder.setMax(null).setMin(null);
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (columnStatistics instanceof IntegerColumnStatistics) {
+                        builder.setMax(((IntegerColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((IntegerColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                case DOUBLE:
+                    if (columnStatistics instanceof DoubleColumnStatistics) {
+                        builder.setMax(((DoubleColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((DoubleColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                case VARBINARY:
+                    if (columnStatistics instanceof StringColumnStatistics) {
+                        builder.setMax(((StringColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((StringColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DATE:
+                    if (columnStatistics instanceof DateColumnStatistics) {
+                        builder.setMax(((DateColumnStatistics) columnStatistics).getMaximum())

Review Comment:
   the `Date` type of `((DateColumnStatistics) columnStatistics).getMaximum()` is not `java.sql.Date`



##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +188,179 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics columnStatistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                    builder.setMax(null).setMin(null);
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (columnStatistics instanceof IntegerColumnStatistics) {
+                        builder.setMax(((IntegerColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((IntegerColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                case DOUBLE:
+                    if (columnStatistics instanceof DoubleColumnStatistics) {
+                        builder.setMax(((DoubleColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((DoubleColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                case VARBINARY:
+                    if (columnStatistics instanceof StringColumnStatistics) {
+                        builder.setMax(((StringColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((StringColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DATE:
+                    if (columnStatistics instanceof DateColumnStatistics) {
+                        builder.setMax(((DateColumnStatistics) columnStatistics).getMaximum())

Review Comment:
   please add some test to cover the Date type change



##########
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemStatisticsReportTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for statistics functionality in {@link OrcFileFormatFactory} in the case of file system
+ * source.
+ */
+public class OrcFileSystemStatisticsReportTest extends TableTestBase {
+
+    private BatchTableTestUtil util;
+    private TableEnvironment tEnv;
+    private String path1;
+
+    @Before
+    public void setup() throws IOException {
+        util = batchTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+        path1 = tempFolder().newFolder().toURI().getPath();
+
+        BatchTableEnvUtil.registerCollection(
+                tEnv,
+                "originalT",
+                TestData.buildInData(),
+                TestData.buildInType(),
+                "a,b,c,d,e,f,g,h,i,j");
+
+        String ddl1 =

Review Comment:
   please create the test base class for statistics report and prepare the test data for all data types in the base class



##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +188,179 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics columnStatistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                    builder.setMax(null).setMin(null);

Review Comment:
   the default value of max, min is null, so `builder.setMax(null).setMin(null);` is no needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org