You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/03 07:08:54 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request #33: [FLINK-26346] Add statistics collecting to sst files

tsreaper opened a new pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33


   Currently field statistics are not collected in sst files. With statistics we can do filter and other operations with better performance.
   
   Some formats like orc already record statistics into file headers, so for these special formats we just need to read them directly from files. For others, however, we need to collect the statistics by hand.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r820412865



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
##########
@@ -161,49 +170,93 @@ protected RowData toRowData(KeyValue kv) {
 
         @Override
         protected SstFileMeta collectFile(Path path) throws IOException {
+            KeyAndValueStats stats = extractStats(path);
             SstFileMeta result =
                     new SstFileMeta(
                             path.getName(),
                             FileUtils.getFileSize(path),
                             rowCount,
                             minKey,
                             keySerializer.toBinaryRow(maxKey).copy(),
-                            collectStats(path),
+                            stats.keyStats,
+                            stats.valueStats,
                             minSequenceNumber,
                             maxSequenceNumber,
                             level);
             resetMeta();
             return result;
         }
 
-        private void resetMeta() {
+        protected void resetMeta() {
             rowCount = 0;
             minKey = null;
             maxKey = null;
             minSequenceNumber = Long.MAX_VALUE;
             maxSequenceNumber = Long.MIN_VALUE;
         }
 
-        protected abstract FieldStats[] collectStats(Path path);
+        protected abstract KeyAndValueStats extractStats(Path path);
+    }
+
+    private class FileExtractingRollingFile extends SstRollingFile {
+
+        private FileExtractingRollingFile(int level) {
+            super(level);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            FieldStats[] rawStats;
+            try {
+                rawStats = fileStatsExtractor.extract(path);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            int numKeyFields = keyType.getFieldCount();
+            return new KeyAndValueStats(
+                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
+                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
+        }
     }
 
     private class StatsCollectingRollingFile extends SstRollingFile {
 
+        private FieldStatsCollector keyStatsCollector;
+        private FieldStatsCollector valueStatsCollector;
+
         private StatsCollectingRollingFile(int level) {
             super(level);
         }
 
         @Override
-        protected FieldStats[] collectStats(Path path) {
-            // TODO
-            //  1. Read statistics directly from the written orc/parquet files.
-            //  2. For other file formats use StatsCollector. Make sure fields are not reused
-            //     otherwise we need copying.
-            FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
-            for (int i = 0; i < stats.length; i++) {
-                stats[i] = new FieldStats(null, null, 0);
-            }
-            return stats;
+        protected RowData toRowData(KeyValue kv) {
+            keyStatsCollector.collect(kv.key());
+            valueStatsCollector.collect(kv.value());
+            return super.toRowData(kv);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            return new KeyAndValueStats(keyStatsCollector.extract(), valueStatsCollector.extract());

Review comment:
       No need. `FileStatsCollector` extract fields from rows. As fields are not reused they do not need to be copied.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r822239780



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.format;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link FileFormat}. */
+public interface FileFormatFactory {

Review comment:
       We can't. Because to create a file format we need class loaders and configurations. Service loader requires factories to have constructors without parameters.
   
   Also each `FileFormatFactory` is tied to a format. This is not true for `FileFormat` because we have a universal `FileFormatImpl`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818455246



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {

Review comment:
       I think we can create a `flink-table-store-common` and `flink-table-store-orc` module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r820416338



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {

Review comment:
       I don't see this is necessary, at least for now. We currently have only one or two classes for orc. Let's consider this when things get complex.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818423236



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -62,14 +63,20 @@ public FileStoreWriteImpl(
             Comparator<RowData> keyComparator,
             Accumulator accumulator,
             FileFormat fileFormat,
+            FileStatsExtractor fileStatsExtractor,

Review comment:
       create a method in `FileFormat.createStatsExtractor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818462032



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+
+    public OrcFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+        Reader reader =
+                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
+
+        long rowCount = reader.getNumberOfRows();
+        ColumnStatistics[] columnStatistics = reader.getStatistics();
+        TypeDescription schema = reader.getSchema();
+        List<String> columnNames = schema.getFieldNames();
+        List<TypeDescription> columnTypes = schema.getChildren();
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = rowType.getFields().get(i);
+                            int fieldIdx = columnNames.indexOf(field.getName());
+                            int colId = columnTypes.get(fieldIdx).getId();
+                            return toFieldStats(field, columnStatistics[colId], rowCount);
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics stats, long rowCount) {
+        long nullCount = rowCount - stats.getNumberOfValues();
+        if (nullCount == rowCount) {
+            // all nulls
+            return new FieldStats(null, null, nullCount);
+        }
+
+        switch (field.getType().getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                assertStatsClass(field, stats, StringColumnStatistics.class);
+                StringColumnStatistics stringStats = (StringColumnStatistics) stats;
+                return new FieldStats(
+                        StringData.fromString(stringStats.getMinimum()),
+                        StringData.fromString(stringStats.getMaximum()),
+                        nullCount);
+            case BOOLEAN:
+                assertStatsClass(field, stats, BooleanColumnStatistics.class);
+                BooleanColumnStatistics boolStats = (BooleanColumnStatistics) stats;
+                return new FieldStats(
+                        boolStats.getFalseCount() == 0, boolStats.getTrueCount() != 0, nullCount);
+            case DECIMAL:
+                assertStatsClass(field, stats, DecimalColumnStatistics.class);
+                DecimalColumnStatistics decimalStats = (DecimalColumnStatistics) stats;
+                DecimalType decimalType = (DecimalType) (field.getType());
+                int precision = decimalType.getPrecision();
+                int scale = decimalType.getScale();
+                return new FieldStats(
+                        DecimalData.fromBigDecimal(
+                                decimalStats.getMinimum().bigDecimalValue(), precision, scale),
+                        DecimalData.fromBigDecimal(
+                                decimalStats.getMaximum().bigDecimalValue(), precision, scale),
+                        nullCount);
+            case TINYINT:
+                assertStatsClass(field, stats, IntegerColumnStatistics.class);
+                IntegerColumnStatistics byteStats = (IntegerColumnStatistics) stats;
+                return new FieldStats(
+                        Long.valueOf(byteStats.getMinimum()).byteValue(),

Review comment:
       just cast? `(byte) byteStats.getMinimum()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818410417



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
##########
@@ -161,49 +170,93 @@ protected RowData toRowData(KeyValue kv) {
 
         @Override
         protected SstFileMeta collectFile(Path path) throws IOException {
+            KeyAndValueStats stats = extractStats(path);
             SstFileMeta result =
                     new SstFileMeta(
                             path.getName(),
                             FileUtils.getFileSize(path),
                             rowCount,
                             minKey,
                             keySerializer.toBinaryRow(maxKey).copy(),
-                            collectStats(path),
+                            stats.keyStats,
+                            stats.valueStats,
                             minSequenceNumber,
                             maxSequenceNumber,
                             level);
             resetMeta();
             return result;
         }
 
-        private void resetMeta() {
+        protected void resetMeta() {
             rowCount = 0;
             minKey = null;
             maxKey = null;
             minSequenceNumber = Long.MAX_VALUE;
             maxSequenceNumber = Long.MIN_VALUE;
         }
 
-        protected abstract FieldStats[] collectStats(Path path);
+        protected abstract KeyAndValueStats extractStats(Path path);
+    }
+
+    private class FileExtractingRollingFile extends SstRollingFile {
+
+        private FileExtractingRollingFile(int level) {
+            super(level);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            FieldStats[] rawStats;
+            try {
+                rawStats = fileStatsExtractor.extract(path);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            int numKeyFields = keyType.getFieldCount();
+            return new KeyAndValueStats(
+                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
+                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
+        }
     }
 
     private class StatsCollectingRollingFile extends SstRollingFile {
 
+        private FieldStatsCollector keyStatsCollector;
+        private FieldStatsCollector valueStatsCollector;
+
         private StatsCollectingRollingFile(int level) {
             super(level);
         }
 
         @Override
-        protected FieldStats[] collectStats(Path path) {
-            // TODO
-            //  1. Read statistics directly from the written orc/parquet files.
-            //  2. For other file formats use StatsCollector. Make sure fields are not reused
-            //     otherwise we need copying.
-            FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
-            for (int i = 0; i < stats.length; i++) {
-                stats[i] = new FieldStats(null, null, 0);
-            }
-            return stats;
+        protected RowData toRowData(KeyValue kv) {
+            keyStatsCollector.collect(kv.key());
+            valueStatsCollector.collect(kv.value());
+            return super.toRowData(kv);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            return new KeyAndValueStats(keyStatsCollector.extract(), valueStatsCollector.extract());

Review comment:
       Should we copy fields in `FieldStatsCollector`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818460125



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+
+    public OrcFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+        Reader reader =
+                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
+
+        long rowCount = reader.getNumberOfRows();
+        ColumnStatistics[] columnStatistics = reader.getStatistics();
+        TypeDescription schema = reader.getSchema();
+        List<String> columnNames = schema.getFieldNames();
+        List<TypeDescription> columnTypes = schema.getChildren();
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = rowType.getFields().get(i);
+                            int fieldIdx = columnNames.indexOf(field.getName());
+                            int colId = columnTypes.get(fieldIdx).getId();
+                            return toFieldStats(field, columnStatistics[colId], rowCount);
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics stats, long rowCount) {
+        long nullCount = rowCount - stats.getNumberOfValues();

Review comment:
       ``` 
    /**
      * Get the number of values in this column. It will differ from the number
      * of rows because of NULL values and repeated values.
      * @return the number of values
      */
      long getNumberOfValues();
   ```
   `getNumberOfValues` seems not work for null count.
   
   Can we just store `hasNull` and `hasNonNull` instead of `nullCount`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r822239780



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.format;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link FileFormat}. */
+public interface FileFormatFactory {

Review comment:
       We can't. Because to create a file format we need class loaders and configurations. Service loader requires factories to have constructors without parameters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r821432900



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {

Review comment:
       Hadoop-related dependencies are a headache




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r821433871



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+
+    public OrcFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+        Reader reader =
+                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
+
+        long rowCount = reader.getNumberOfRows();
+        ColumnStatistics[] columnStatistics = reader.getStatistics();
+        TypeDescription schema = reader.getSchema();
+        List<String> columnNames = schema.getFieldNames();
+        List<TypeDescription> columnTypes = schema.getChildren();
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = rowType.getFields().get(i);
+                            int fieldIdx = columnNames.indexOf(field.getName());
+                            int colId = columnTypes.get(fieldIdx).getId();
+                            return toFieldStats(field, columnStatistics[colId], rowCount);
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics stats, long rowCount) {
+        long nullCount = rowCount - stats.getNumberOfValues();

Review comment:
       We can check nullCount with `hasNull`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r821524301



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
##########
@@ -35,7 +35,7 @@ public FieldStatsCollector(RowType rowType) {
         this.minValues = new Object[numFields];
         this.maxValues = new Object[numFields];
         this.nullCounts = new long[numFields];
-        this.converter = new RowDataToObjectArrayConverter(rowType);
+        this.converter = new RowDataToObjectArrayConverter(rowType, true);

Review comment:
       I think we can just copy the field in `minValues[i] = c;` and `maxValues[i] = c;`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r821481242



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+
+    public OrcFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+        Reader reader =
+                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
+
+        long rowCount = reader.getNumberOfRows();
+        ColumnStatistics[] columnStatistics = reader.getStatistics();
+        TypeDescription schema = reader.getSchema();
+        List<String> columnNames = schema.getFieldNames();
+        List<TypeDescription> columnTypes = schema.getChildren();
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = rowType.getFields().get(i);
+                            int fieldIdx = columnNames.indexOf(field.getName());
+                            int colId = columnTypes.get(fieldIdx).getId();
+                            return toFieldStats(field, columnStatistics[colId], rowCount);
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics stats, long rowCount) {
+        long nullCount = rowCount - stats.getNumberOfValues();

Review comment:
       I don't know why java docs of `ColumnStatistics#getNumberOfValues` state like this. But according to the implementation in `TreeWriterBase#writeBatch` and according to the unit tests current implementation is alright.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818460125



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+
+    public OrcFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+        Reader reader =
+                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
+
+        long rowCount = reader.getNumberOfRows();
+        ColumnStatistics[] columnStatistics = reader.getStatistics();
+        TypeDescription schema = reader.getSchema();
+        List<String> columnNames = schema.getFieldNames();
+        List<TypeDescription> columnTypes = schema.getChildren();
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = rowType.getFields().get(i);
+                            int fieldIdx = columnNames.indexOf(field.getName());
+                            int colId = columnTypes.get(fieldIdx).getId();
+                            return toFieldStats(field, columnStatistics[colId], rowCount);
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics stats, long rowCount) {
+        long nullCount = rowCount - stats.getNumberOfValues();

Review comment:
       ``` 
    /**
      * Get the number of values in this column. It will differ from the number
      * of rows because of NULL values and repeated values.
      * @return the number of values
      */
      long getNumberOfValues();
   ```
   `getNumberOfValues` seems not work for null count.
   
   Can we just store `hasNull` instead of `nullCount`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r822228316



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.format;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link FileFormat}. */
+public interface FileFormatFactory {

Review comment:
       Do we need this one? Can we just make FileFormat a factory?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r821528073



##########
File path: flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.orc;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+
+    public OrcFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
+        Reader reader =
+                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
+
+        long rowCount = reader.getNumberOfRows();
+        ColumnStatistics[] columnStatistics = reader.getStatistics();
+        TypeDescription schema = reader.getSchema();
+        List<String> columnNames = schema.getFieldNames();
+        List<TypeDescription> columnTypes = schema.getChildren();
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = rowType.getFields().get(i);
+                            int fieldIdx = columnNames.indexOf(field.getName());
+                            int colId = columnTypes.get(fieldIdx).getId();
+                            return toFieldStats(field, columnStatistics[colId], rowCount);
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics stats, long rowCount) {
+        long nullCount = rowCount - stats.getNumberOfValues();

Review comment:
       Add a check here, if !stats.hasNull() nullCount should be zero.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818423236



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
##########
@@ -62,14 +63,20 @@ public FileStoreWriteImpl(
             Comparator<RowData> keyComparator,
             Accumulator accumulator,
             FileFormat fileFormat,
+            FileStatsExtractor fileStatsExtractor,

Review comment:
       create a method `FileFormat.createStatsExtractor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org