You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/17 02:12:15 UTC
[flink-table-store] branch master updated: [FLINK-30027] Fields min and max in BinaryTableStats support lazy deserialization
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 4d6bc72a [FLINK-30027] Fields min and max in BinaryTableStats support lazy deserialization
4d6bc72a is described below
commit 4d6bc72aeddea0c1bf551a2542ecc990220f8edd
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Nov 17 10:12:09 2022 +0800
[FLINK-30027] Fields min and max in BinaryTableStats support lazy deserialization
This closes #382
---
.../table/store/file/stats/BinaryTableStats.java | 78 ++++++++++--------
.../store/file/stats/BinaryTableStatsTest.java | 93 ++++++++++++++++++++++
2 files changed, 139 insertions(+), 32 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
index 18e0b277..91f0eece 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.format.FieldStats;
import javax.annotation.Nullable;
@@ -32,32 +31,34 @@ import java.util.Objects;
import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
+import static org.apache.flink.util.Preconditions.checkNotNull;
-/**
- * A serialized row bytes to cache {@link FieldStats}.
- *
- * <p>TODO: {@link Predicate} get min and max from {@link BinaryRowData}, lazily deserialization.
- */
+/** A serialized row bytes to cache {@link FieldStats}. */
public class BinaryTableStats {
- private final BinaryRowData min;
- private final BinaryRowData max;
- private final long[] nullCounts;
-
+ @Nullable private RowData row;
@Nullable private FieldStats[] cacheArray;
+ @Nullable private BinaryRowData cacheMin;
+ @Nullable private BinaryRowData cacheMax;
+ @Nullable private long[] cacheNullCounts;
- public BinaryTableStats(BinaryRowData min, BinaryRowData max, long[] nullCounts) {
- this(min, max, nullCounts, null);
+ public BinaryTableStats(RowData row) {
+ this.row = row;
}
public BinaryTableStats(
- BinaryRowData min,
- BinaryRowData max,
- long[] nullCounts,
+ BinaryRowData cacheMin, BinaryRowData cacheMax, long[] cacheNullCounts) {
+ this(cacheMin, cacheMax, cacheNullCounts, null);
+ }
+
+ public BinaryTableStats(
+ BinaryRowData cacheMin,
+ BinaryRowData cacheMax,
+ long[] cacheNullCounts,
@Nullable FieldStats[] cacheArray) {
- this.min = min;
- this.max = max;
- this.nullCounts = nullCounts;
+ this.cacheMin = cacheMin;
+ this.cacheMax = cacheMax;
+ this.cacheNullCounts = cacheNullCounts;
this.cacheArray = cacheArray;
}
@@ -73,27 +74,40 @@ public class BinaryTableStats {
}
public BinaryRowData min() {
- return min;
+ if (cacheMin == null) {
+ checkNotNull(row);
+ cacheMin = deserializeBinaryRow(this.row.getBinary(0));
+ }
+ return cacheMin;
}
public BinaryRowData max() {
- return max;
+ if (cacheMax == null) {
+ checkNotNull(row);
+ cacheMax = deserializeBinaryRow(this.row.getBinary(1));
+ }
+ return cacheMax;
}
public long[] nullCounts() {
- return nullCounts;
+ if (cacheNullCounts == null) {
+ checkNotNull(row);
+ cacheNullCounts = row.getArray(2).toLongArray();
+ }
+ return cacheNullCounts;
}
public RowData toRowData() {
- return GenericRowData.of(
- serializeBinaryRow(min), serializeBinaryRow(max), new GenericArrayData(nullCounts));
+ return row == null
+ ? GenericRowData.of(
+ serializeBinaryRow(min()),
+ serializeBinaryRow(max()),
+ new GenericArrayData(nullCounts()))
+ : row;
}
public static BinaryTableStats fromRowData(RowData row) {
- return new BinaryTableStats(
- deserializeBinaryRow(row.getBinary(0)),
- deserializeBinaryRow(row.getBinary(1)),
- row.getArray(2).toLongArray());
+ return new BinaryTableStats(row);
}
@Override
@@ -105,15 +119,15 @@ public class BinaryTableStats {
return false;
}
BinaryTableStats that = (BinaryTableStats) o;
- return Objects.equals(min, that.min)
- && Objects.equals(max, that.max)
- && Arrays.equals(nullCounts, that.nullCounts);
+ return Objects.equals(min(), that.min())
+ && Objects.equals(max(), that.max())
+ && Arrays.equals(nullCounts(), that.nullCounts());
}
@Override
public int hashCode() {
- int result = Objects.hash(min, max);
- result = 31 * result + Arrays.hashCode(nullCounts);
+ int result = Objects.hash(min(), max());
+ result = 31 * result + Arrays.hashCode(nullCounts());
return result;
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
new file mode 100644
index 00000000..b2c30fad
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BinaryTableStats}. */
+public class BinaryTableStatsTest {
+ @Test
+ public void testBinaryTableStats() {
+ List<Integer> minList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
+ List<Integer> maxList = Arrays.asList(11, 12, 13, 14, 15, 16, 17, 18, 19);
+ long[] nullCounts = new long[] {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
+
+ BinaryRowData minRowData = binaryRow(minList);
+ BinaryRowData maxRowData = binaryRow(maxList);
+ BinaryTableStats tableStats1 = new BinaryTableStats(minRowData, maxRowData, nullCounts);
+
+ RowData rowData = tableStats1.toRowData();
+ BinaryTableStats tableStats2 = new BinaryTableStats(rowData);
+ assertThat(tableStats1.min()).isEqualTo(tableStats2.min());
+ assertThat(tableStats1.max()).isEqualTo(tableStats2.max());
+ assertThat(tableStats1.nullCounts()).isEqualTo(tableStats2.nullCounts());
+ assertThat(tableStats1).isEqualTo(tableStats2);
+
+ FieldStatsArraySerializer serializer =
+ new FieldStatsArraySerializer(
+ RowType.of(
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new IntType()));
+
+ FieldStats[] fieldStatsArray1 = tableStats1.fields(serializer, 100L);
+ FieldStats[] fieldStatsArray2 = tableStats2.fields(serializer, 100L);
+ assertThat(fieldStatsArray1.length).isEqualTo(fieldStatsArray2.length).isEqualTo(9);
+ assertThat(fieldStatsArray1).isEqualTo(fieldStatsArray2);
+ for (int i = 0; i < fieldStatsArray1.length; i++) {
+ FieldStats fieldStats = fieldStatsArray1[i];
+ assertThat(fieldStats.minValue()).isEqualTo(1 + i);
+ assertThat(fieldStats.maxValue()).isEqualTo(11 + i);
+ assertThat(fieldStats.nullCount()).isEqualTo(i);
+ }
+
+ BinaryTableStats tableStats3 =
+ new BinaryTableStats(minRowData, maxRowData, nullCounts, fieldStatsArray1);
+ assertThat(tableStats3).isEqualTo(tableStats1).isEqualTo(tableStats2);
+ assertThat(tableStats3.fields(serializer)).isEqualTo(fieldStatsArray2);
+ }
+
+ private BinaryRowData binaryRow(List<Integer> valueList) {
+ BinaryRowData b = new BinaryRowData(valueList.size());
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ for (int i = 0; i < valueList.size(); i++) {
+ writer.writeInt(i, valueList.get(i));
+ }
+ writer.complete();
+ return b;
+ }
+}