You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/03 08:17:24 UTC

[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #33: [FLINK-26346] Add statistics collecting to sst files

JingsongLi commented on a change in pull request #33:
URL: https://github.com/apache/flink-table-store/pull/33#discussion_r818410417



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
##########
@@ -161,49 +170,93 @@ protected RowData toRowData(KeyValue kv) {
 
         @Override
         protected SstFileMeta collectFile(Path path) throws IOException {
+            KeyAndValueStats stats = extractStats(path);
             SstFileMeta result =
                     new SstFileMeta(
                             path.getName(),
                             FileUtils.getFileSize(path),
                             rowCount,
                             minKey,
                             keySerializer.toBinaryRow(maxKey).copy(),
-                            collectStats(path),
+                            stats.keyStats,
+                            stats.valueStats,
                             minSequenceNumber,
                             maxSequenceNumber,
                             level);
             resetMeta();
             return result;
         }
 
-        private void resetMeta() {
+        protected void resetMeta() {
             rowCount = 0;
             minKey = null;
             maxKey = null;
             minSequenceNumber = Long.MAX_VALUE;
             maxSequenceNumber = Long.MIN_VALUE;
         }
 
-        protected abstract FieldStats[] collectStats(Path path);
+        protected abstract KeyAndValueStats extractStats(Path path);
+    }
+
+    private class FileExtractingRollingFile extends SstRollingFile {
+
+        private FileExtractingRollingFile(int level) {
+            super(level);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            FieldStats[] rawStats;
+            try {
+                rawStats = fileStatsExtractor.extract(path);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            int numKeyFields = keyType.getFieldCount();
+            return new KeyAndValueStats(
+                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
+                    Arrays.copyOfRange(rawStats, numKeyFields + 2, rawStats.length));
+        }
     }
 
     private class StatsCollectingRollingFile extends SstRollingFile {
 
+        private FieldStatsCollector keyStatsCollector;
+        private FieldStatsCollector valueStatsCollector;
+
         private StatsCollectingRollingFile(int level) {
             super(level);
         }
 
         @Override
-        protected FieldStats[] collectStats(Path path) {
-            // TODO
-            //  1. Read statistics directly from the written orc/parquet files.
-            //  2. For other file formats use StatsCollector. Make sure fields are not reused
-            //     otherwise we need copying.
-            FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
-            for (int i = 0; i < stats.length; i++) {
-                stats[i] = new FieldStats(null, null, 0);
-            }
-            return stats;
+        protected RowData toRowData(KeyValue kv) {
+            keyStatsCollector.collect(kv.key());
+            valueStatsCollector.collect(kv.value());
+            return super.toRowData(kv);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            return new KeyAndValueStats(keyStatsCollector.extract(), valueStatsCollector.extract());

Review comment:
       Should we copy fields in `FieldStatsCollector`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org