You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/17 02:12:15 UTC

[flink-table-store] branch master updated: [FLINK-30027] Fields min and max in BinaryTableStats support lazy deserialization

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d6bc72a [FLINK-30027] Fields min and max in BinaryTableStats support lazy deserialization
4d6bc72a is described below

commit 4d6bc72aeddea0c1bf551a2542ecc990220f8edd
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Nov 17 10:12:09 2022 +0800

    [FLINK-30027] Fields min and max in BinaryTableStats support lazy deserialization
    
    This closes #382
---
 .../table/store/file/stats/BinaryTableStats.java   | 78 ++++++++++--------
 .../store/file/stats/BinaryTableStatsTest.java     | 93 ++++++++++++++++++++++
 2 files changed, 139 insertions(+), 32 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
index 18e0b277..91f0eece 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.format.FieldStats;
 
 import javax.annotation.Nullable;
@@ -32,32 +31,34 @@ import java.util.Objects;
 
 import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
 import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/**
- * A serialized row bytes to cache {@link FieldStats}.
- *
- * <p>TODO: {@link Predicate} get min and max from {@link BinaryRowData}, lazily deserialization.
- */
+/** A serialized row bytes to cache {@link FieldStats}. */
 public class BinaryTableStats {
 
-    private final BinaryRowData min;
-    private final BinaryRowData max;
-    private final long[] nullCounts;
-
+    @Nullable private RowData row;
     @Nullable private FieldStats[] cacheArray;
+    @Nullable private BinaryRowData cacheMin;
+    @Nullable private BinaryRowData cacheMax;
+    @Nullable private long[] cacheNullCounts;
 
-    public BinaryTableStats(BinaryRowData min, BinaryRowData max, long[] nullCounts) {
-        this(min, max, nullCounts, null);
+    public BinaryTableStats(RowData row) {
+        this.row = row;
     }
 
     public BinaryTableStats(
-            BinaryRowData min,
-            BinaryRowData max,
-            long[] nullCounts,
+            BinaryRowData cacheMin, BinaryRowData cacheMax, long[] cacheNullCounts) {
+        this(cacheMin, cacheMax, cacheNullCounts, null);
+    }
+
+    public BinaryTableStats(
+            BinaryRowData cacheMin,
+            BinaryRowData cacheMax,
+            long[] cacheNullCounts,
             @Nullable FieldStats[] cacheArray) {
-        this.min = min;
-        this.max = max;
-        this.nullCounts = nullCounts;
+        this.cacheMin = cacheMin;
+        this.cacheMax = cacheMax;
+        this.cacheNullCounts = cacheNullCounts;
         this.cacheArray = cacheArray;
     }
 
@@ -73,27 +74,40 @@ public class BinaryTableStats {
     }
 
     public BinaryRowData min() {
-        return min;
+        if (cacheMin == null) {
+            checkNotNull(row);
+            cacheMin = deserializeBinaryRow(this.row.getBinary(0));
+        }
+        return cacheMin;
     }
 
     public BinaryRowData max() {
-        return max;
+        if (cacheMax == null) {
+            checkNotNull(row);
+            cacheMax = deserializeBinaryRow(this.row.getBinary(1));
+        }
+        return cacheMax;
     }
 
     public long[] nullCounts() {
-        return nullCounts;
+        if (cacheNullCounts == null) {
+            checkNotNull(row);
+            cacheNullCounts = row.getArray(2).toLongArray();
+        }
+        return cacheNullCounts;
     }
 
     public RowData toRowData() {
-        return GenericRowData.of(
-                serializeBinaryRow(min), serializeBinaryRow(max), new GenericArrayData(nullCounts));
+        return row == null
+                ? GenericRowData.of(
+                        serializeBinaryRow(min()),
+                        serializeBinaryRow(max()),
+                        new GenericArrayData(nullCounts()))
+                : row;
     }
 
     public static BinaryTableStats fromRowData(RowData row) {
-        return new BinaryTableStats(
-                deserializeBinaryRow(row.getBinary(0)),
-                deserializeBinaryRow(row.getBinary(1)),
-                row.getArray(2).toLongArray());
+        return new BinaryTableStats(row);
     }
 
     @Override
@@ -105,15 +119,15 @@ public class BinaryTableStats {
             return false;
         }
         BinaryTableStats that = (BinaryTableStats) o;
-        return Objects.equals(min, that.min)
-                && Objects.equals(max, that.max)
-                && Arrays.equals(nullCounts, that.nullCounts);
+        return Objects.equals(min(), that.min())
+                && Objects.equals(max(), that.max())
+                && Arrays.equals(nullCounts(), that.nullCounts());
     }
 
     @Override
     public int hashCode() {
-        int result = Objects.hash(min, max);
-        result = 31 * result + Arrays.hashCode(nullCounts);
+        int result = Objects.hash(min(), max());
+        result = 31 * result + Arrays.hashCode(nullCounts());
         return result;
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
new file mode 100644
index 00000000..b2c30fad
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BinaryTableStats}. */
+public class BinaryTableStatsTest {
+    @Test
+    public void testBinaryTableStats() {
+        List<Integer> minList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
+        List<Integer> maxList = Arrays.asList(11, 12, 13, 14, 15, 16, 17, 18, 19);
+        long[] nullCounts = new long[] {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
+
+        BinaryRowData minRowData = binaryRow(minList);
+        BinaryRowData maxRowData = binaryRow(maxList);
+        BinaryTableStats tableStats1 = new BinaryTableStats(minRowData, maxRowData, nullCounts);
+
+        RowData rowData = tableStats1.toRowData();
+        BinaryTableStats tableStats2 = new BinaryTableStats(rowData);
+        assertThat(tableStats1.min()).isEqualTo(tableStats2.min());
+        assertThat(tableStats1.max()).isEqualTo(tableStats2.max());
+        assertThat(tableStats1.nullCounts()).isEqualTo(tableStats2.nullCounts());
+        assertThat(tableStats1).isEqualTo(tableStats2);
+
+        FieldStatsArraySerializer serializer =
+                new FieldStatsArraySerializer(
+                        RowType.of(
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType()));
+
+        FieldStats[] fieldStatsArray1 = tableStats1.fields(serializer, 100L);
+        FieldStats[] fieldStatsArray2 = tableStats2.fields(serializer, 100L);
+        assertThat(fieldStatsArray1.length).isEqualTo(fieldStatsArray2.length).isEqualTo(9);
+        assertThat(fieldStatsArray1).isEqualTo(fieldStatsArray2);
+        for (int i = 0; i < fieldStatsArray1.length; i++) {
+            FieldStats fieldStats = fieldStatsArray1[i];
+            assertThat(fieldStats.minValue()).isEqualTo(1 + i);
+            assertThat(fieldStats.maxValue()).isEqualTo(11 + i);
+            assertThat(fieldStats.nullCount()).isEqualTo(i);
+        }
+
+        BinaryTableStats tableStats3 =
+                new BinaryTableStats(minRowData, maxRowData, nullCounts, fieldStatsArray1);
+        assertThat(tableStats3).isEqualTo(tableStats1).isEqualTo(tableStats2);
+        assertThat(tableStats3.fields(serializer)).isEqualTo(fieldStatsArray2);
+    }
+
+    private BinaryRowData binaryRow(List<Integer> valueList) {
+        BinaryRowData b = new BinaryRowData(valueList.size());
+        BinaryRowWriter writer = new BinaryRowWriter(b);
+        for (int i = 0; i < valueList.size(); i++) {
+            writer.writeInt(i, valueList.get(i));
+        }
+        writer.complete();
+        return b;
+    }
+}