You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/05 07:47:47 UTC

[flink-table-store] branch master updated: [FLINK-28017] Introduce bucket-key to table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 907747d3 [FLINK-28017] Introduce bucket-key to table store
907747d3 is described below

commit 907747d3e37b7edb5c1bec5209e47bd4ff9f737b
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 5 15:47:44 2022 +0800

    [FLINK-28017] Introduce bucket-key to table store
    
    This closes #193
---
 .../connector/sink/BucketStreamPartitioner.java    |  3 +-
 .../org/apache/flink/table/store/CoreOptions.java  | 17 ++++
 .../flink/table/store/file/schema/TableSchema.java | 34 +++++++-
 .../store/table/sink/SinkRecordConverter.java      | 29 +++++--
 .../store/table/sink/SinkRecordConverterTest.java  | 92 ++++++++++++++++++++++
 5 files changed, 167 insertions(+), 8 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index 774d2a1d..cd640c43 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -48,8 +48,7 @@ public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
     @Override
     public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
         RowData row = record.getInstance().getValue();
-        int bucket = recordConverter.bucket(row, recordConverter.primaryKey(row));
-        return bucket % numberOfChannels;
+        return recordConverter.bucket(row) % numberOfChannels;
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 88a32d80..afc3c498 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -52,6 +52,23 @@ public class CoreOptions implements Serializable {
                     .defaultValue(1)
                     .withDescription("Bucket number for file store.");
 
+    public static final ConfigOption<String> BUCKET_KEY =
+            ConfigOptions.key("bucket-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Specify the table store distribution policy. Data is assigned"
+                                                    + " to each bucket according to the hash value of bucket-key.")
+                                    .linebreak()
+                                    .text("If you specify multiple fields, delimiter is ','.")
+                                    .linebreak()
+                                    .text(
+                                            "If not specified, the primary key will be used; "
+                                                    + "if there is no primary key, the full row will be used.")
+                                    .build());
+
     @Internal
     public static final ConfigOption<String> PATH =
             ConfigOptions.key("path")
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 89d7ce03..2a6c6b34 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -25,9 +25,11 @@ import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -37,6 +39,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
+
 /** Schema of a table. */
 public class TableSchema implements Serializable {
 
@@ -116,7 +120,8 @@ public class TableSchema implements Serializable {
             Preconditions.checkState(
                     adjusted.size() > 0,
                     String.format(
-                            "Primary key constraint %s should not be same with partition fields %s, this will result in only one record in a partition",
+                            "Primary key constraint %s should not be same with partition fields %s,"
+                                    + " this will result in only one record in a partition",
                             primaryKeys, partitionKeys));
 
             return adjusted;
@@ -129,6 +134,33 @@ public class TableSchema implements Serializable {
         return options;
     }
 
+    public List<String> bucketKeys() {
+        String key = options.get(BUCKET_KEY.key());
+        if (StringUtils.isNullOrWhitespaceOnly(key)) {
+            return Collections.emptyList();
+        }
+        List<String> bucketKeys = Arrays.asList(key.split(","));
+        if (!containsAll(fieldNames(), bucketKeys)) {
+            throw new RuntimeException(
+                    String.format(
+                            "Field names %s should contains all bucket keys %s.",
+                            fieldNames(), bucketKeys));
+        }
+        if (primaryKeys.size() > 0) {
+            if (!containsAll(primaryKeys, bucketKeys)) {
+                throw new RuntimeException(
+                        String.format(
+                                "Primary keys %s should contains all bucket keys %s.",
+                                primaryKeys, bucketKeys));
+            }
+        }
+        return bucketKeys;
+    }
+
+    private boolean containsAll(List<String> all, List<String> contains) {
+        return new HashSet<>(all).containsAll(new HashSet<>(contains));
+    }
+
     public String comment() {
         return comment;
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 40b99afb..9d926ba5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -40,6 +40,8 @@ public class SinkRecordConverter {
 
     private final Projection<RowData, BinaryRowData> partProjection;
 
+    private final Projection<RowData, BinaryRowData> bucketProjection;
+
     private final Projection<RowData, BinaryRowData> pkProjection;
 
     @Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
@@ -49,14 +51,16 @@ public class SinkRecordConverter {
                 numBucket,
                 tableSchema.logicalRowType(),
                 tableSchema.projection(tableSchema.partitionKeys()),
+                tableSchema.projection(tableSchema.bucketKeys()),
                 tableSchema.projection(tableSchema.trimmedPrimaryKeys()),
                 tableSchema.projection(tableSchema.primaryKeys()));
     }
 
-    public SinkRecordConverter(
+    private SinkRecordConverter(
             int numBucket,
             RowType inputType,
             int[] partitions,
+            int[] bucketKeys,
             int[] primaryKeys,
             int[] logPrimaryKeys) {
         this.numBucket = numBucket;
@@ -64,6 +68,7 @@ public class SinkRecordConverter {
                 CodeGenUtils.newProjection(
                         inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
         this.partProjection = CodeGenUtils.newProjection(inputType, partitions);
+        this.bucketProjection = CodeGenUtils.newProjection(inputType, bucketKeys);
         this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
         this.logPkProjection =
                 Arrays.equals(primaryKeys, logPrimaryKeys)
@@ -74,7 +79,7 @@ public class SinkRecordConverter {
     public SinkRecord convert(RowData row) {
         BinaryRowData partition = partProjection.apply(row);
         BinaryRowData primaryKey = primaryKey(row);
-        int bucket = bucket(row, primaryKey);
+        int bucket = bucket(row, bucketKey(row, primaryKey));
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
@@ -86,17 +91,31 @@ public class SinkRecordConverter {
         return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, record.row());
     }
 
-    public BinaryRowData primaryKey(RowData row) {
+    public int bucket(RowData row) {
+        return bucket(row, bucketKey(row));
+    }
+
+    private BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
+    private BinaryRowData bucketKey(RowData row) {
+        BinaryRowData bucketKey = bucketProjection.apply(row);
+        return bucketKey.getArity() == 0 ? pkProjection.apply(row) : bucketKey;
+    }
+
+    private BinaryRowData bucketKey(RowData row, BinaryRowData primaryKey) {
+        BinaryRowData bucketKey = bucketProjection.apply(row);
+        return bucketKey.getArity() == 0 ? primaryKey : bucketKey;
+    }
+
     private BinaryRowData logPrimaryKey(RowData row) {
         assert logPkProjection != null;
         return logPkProjection.apply(row);
     }
 
-    public int bucket(RowData row, BinaryRowData primaryKey) {
-        int hash = primaryKey.getArity() == 0 ? hashRow(row) : primaryKey.hashCode();
+    private int bucket(RowData row, BinaryRowData bucketKey) {
+        int hash = bucketKey.getArity() == 0 ? hashRow(row) : bucketKey.hashCode();
         return Math.abs(hash % numBucket);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
new file mode 100644
index 00000000..eea054d4
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link SinkRecordConverter}. */
+public class SinkRecordConverterTest {
+
+    @Test
+    public void testInvalidBucket() {
+        assertThatThrownBy(() -> converter("n", "b"))
+                .hasMessageContaining("Field names [a, b, c] should contains all bucket keys [n].");
+
+        assertThatThrownBy(() -> converter("a", "b"))
+                .hasMessageContaining("Primary keys [b] should contains all bucket keys [a].");
+    }
+
+    @Test
+    public void testBucket() {
+        GenericRowData row = GenericRowData.of(5, 6, 7);
+        assertThat(bucket(converter("a", "a,b"), row)).isEqualTo(96);
+        assertThat(bucket(converter("", "a"), row)).isEqualTo(96);
+        assertThat(bucket(converter("", "a,b"), row)).isEqualTo(27);
+        assertThat(bucket(converter("a,b", "a,b"), row)).isEqualTo(27);
+        assertThat(bucket(converter("", ""), row)).isEqualTo(40);
+        assertThat(bucket(converter("a,b,c", ""), row)).isEqualTo(40);
+        assertThat(bucket(converter("", "a,b,c"), row)).isEqualTo(40);
+    }
+
+    private int bucket(SinkRecordConverter converter, RowData row) {
+        int bucket1 = converter.bucket(row);
+        int bucket2 = converter.convert(row).bucket();
+        assertThat(bucket1).isEqualTo(bucket2);
+        return bucket1;
+    }
+
+    private SinkRecordConverter converter(String bk, String pk) {
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new RowType.RowField("a", new IntType()),
+                                new RowType.RowField("b", new IntType()),
+                                new RowType.RowField("c", new IntType())));
+        List<DataField> fields = TableSchema.newFields(rowType);
+        Map<String, String> options = new HashMap<>();
+        options.put(BUCKET_KEY.key(), bk);
+        TableSchema schema =
+                new TableSchema(
+                        0,
+                        fields,
+                        TableSchema.currentHighestFieldId(fields),
+                        Collections.emptyList(),
+                        "".equals(pk) ? Collections.emptyList() : Arrays.asList(pk.split(",")),
+                        options,
+                        "");
+        return new SinkRecordConverter(100, schema);
+    }
+}