You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/05 07:47:47 UTC
[flink-table-store] branch master updated: [FLINK-28017] Introduce bucket-key to table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 907747d3 [FLINK-28017] Introduce bucket-key to table store
907747d3 is described below
commit 907747d3e37b7edb5c1bec5209e47bd4ff9f737b
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 5 15:47:44 2022 +0800
[FLINK-28017] Introduce bucket-key to table store
This closes #193
---
.../connector/sink/BucketStreamPartitioner.java | 3 +-
.../org/apache/flink/table/store/CoreOptions.java | 17 ++++
.../flink/table/store/file/schema/TableSchema.java | 34 +++++++-
.../store/table/sink/SinkRecordConverter.java | 29 +++++--
.../store/table/sink/SinkRecordConverterTest.java | 92 ++++++++++++++++++++++
5 files changed, 167 insertions(+), 8 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index 774d2a1d..cd640c43 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -48,8 +48,7 @@ public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
RowData row = record.getInstance().getValue();
- int bucket = recordConverter.bucket(row, recordConverter.primaryKey(row));
- return bucket % numberOfChannels;
+ return recordConverter.bucket(row) % numberOfChannels;
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 88a32d80..afc3c498 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -52,6 +52,23 @@ public class CoreOptions implements Serializable {
.defaultValue(1)
.withDescription("Bucket number for file store.");
+ public static final ConfigOption<String> BUCKET_KEY =
+ ConfigOptions.key("bucket-key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Specify the table store distribution policy. Data is assigned"
+ + " to each bucket according to the hash value of bucket-key.")
+ .linebreak()
+ .text("If you specify multiple fields, delimiter is ','.")
+ .linebreak()
+ .text(
+ "If not specified, the primary key will be used; "
+ + "if there is no primary key, the full row will be used.")
+ .build());
+
@Internal
public static final ConfigOption<String> PATH =
ConfigOptions.key("path")
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 89d7ce03..2a6c6b34 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -25,9 +25,11 @@ import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -37,6 +39,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
+
/** Schema of a table. */
public class TableSchema implements Serializable {
@@ -116,7 +120,8 @@ public class TableSchema implements Serializable {
Preconditions.checkState(
adjusted.size() > 0,
String.format(
- "Primary key constraint %s should not be same with partition fields %s, this will result in only one record in a partition",
+ "Primary key constraint %s should not be same with partition fields %s,"
+ + " this will result in only one record in a partition",
primaryKeys, partitionKeys));
return adjusted;
@@ -129,6 +134,33 @@ public class TableSchema implements Serializable {
return options;
}
+ public List<String> bucketKeys() {
+ String key = options.get(BUCKET_KEY.key());
+ if (StringUtils.isNullOrWhitespaceOnly(key)) {
+ return Collections.emptyList();
+ }
+ List<String> bucketKeys = Arrays.asList(key.split(","));
+ if (!containsAll(fieldNames(), bucketKeys)) {
+ throw new RuntimeException(
+ String.format(
+ "Field names %s should contains all bucket keys %s.",
+ fieldNames(), bucketKeys));
+ }
+ if (primaryKeys.size() > 0) {
+ if (!containsAll(primaryKeys, bucketKeys)) {
+ throw new RuntimeException(
+ String.format(
+ "Primary keys %s should contains all bucket keys %s.",
+ primaryKeys, bucketKeys));
+ }
+ }
+ return bucketKeys;
+ }
+
+ private boolean containsAll(List<String> all, List<String> contains) {
+ return new HashSet<>(all).containsAll(new HashSet<>(contains));
+ }
+
public String comment() {
return comment;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 40b99afb..9d926ba5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -40,6 +40,8 @@ public class SinkRecordConverter {
private final Projection<RowData, BinaryRowData> partProjection;
+ private final Projection<RowData, BinaryRowData> bucketProjection;
+
private final Projection<RowData, BinaryRowData> pkProjection;
@Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
@@ -49,14 +51,16 @@ public class SinkRecordConverter {
numBucket,
tableSchema.logicalRowType(),
tableSchema.projection(tableSchema.partitionKeys()),
+ tableSchema.projection(tableSchema.bucketKeys()),
tableSchema.projection(tableSchema.trimmedPrimaryKeys()),
tableSchema.projection(tableSchema.primaryKeys()));
}
- public SinkRecordConverter(
+ private SinkRecordConverter(
int numBucket,
RowType inputType,
int[] partitions,
+ int[] bucketKeys,
int[] primaryKeys,
int[] logPrimaryKeys) {
this.numBucket = numBucket;
@@ -64,6 +68,7 @@ public class SinkRecordConverter {
CodeGenUtils.newProjection(
inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
this.partProjection = CodeGenUtils.newProjection(inputType, partitions);
+ this.bucketProjection = CodeGenUtils.newProjection(inputType, bucketKeys);
this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
this.logPkProjection =
Arrays.equals(primaryKeys, logPrimaryKeys)
@@ -74,7 +79,7 @@ public class SinkRecordConverter {
public SinkRecord convert(RowData row) {
BinaryRowData partition = partProjection.apply(row);
BinaryRowData primaryKey = primaryKey(row);
- int bucket = bucket(row, primaryKey);
+ int bucket = bucket(row, bucketKey(row, primaryKey));
return new SinkRecord(partition, bucket, primaryKey, row);
}
@@ -86,17 +91,31 @@ public class SinkRecordConverter {
return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, record.row());
}
- public BinaryRowData primaryKey(RowData row) {
+ public int bucket(RowData row) {
+ return bucket(row, bucketKey(row));
+ }
+
+ private BinaryRowData primaryKey(RowData row) {
return pkProjection.apply(row);
}
+ private BinaryRowData bucketKey(RowData row) {
+ BinaryRowData bucketKey = bucketProjection.apply(row);
+ return bucketKey.getArity() == 0 ? pkProjection.apply(row) : bucketKey;
+ }
+
+ private BinaryRowData bucketKey(RowData row, BinaryRowData primaryKey) {
+ BinaryRowData bucketKey = bucketProjection.apply(row);
+ return bucketKey.getArity() == 0 ? primaryKey : bucketKey;
+ }
+
private BinaryRowData logPrimaryKey(RowData row) {
assert logPkProjection != null;
return logPkProjection.apply(row);
}
- public int bucket(RowData row, BinaryRowData primaryKey) {
- int hash = primaryKey.getArity() == 0 ? hashRow(row) : primaryKey.hashCode();
+ private int bucket(RowData row, BinaryRowData bucketKey) {
+ int hash = bucketKey.getArity() == 0 ? hashRow(row) : bucketKey.hashCode();
return Math.abs(hash % numBucket);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
new file mode 100644
index 00000000..eea054d4
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link SinkRecordConverter}. */
+public class SinkRecordConverterTest {
+
+ @Test
+ public void testInvalidBucket() {
+ assertThatThrownBy(() -> converter("n", "b"))
+ .hasMessageContaining("Field names [a, b, c] should contains all bucket keys [n].");
+
+ assertThatThrownBy(() -> converter("a", "b"))
+ .hasMessageContaining("Primary keys [b] should contains all bucket keys [a].");
+ }
+
+ @Test
+ public void testBucket() {
+ GenericRowData row = GenericRowData.of(5, 6, 7);
+ assertThat(bucket(converter("a", "a,b"), row)).isEqualTo(96);
+ assertThat(bucket(converter("", "a"), row)).isEqualTo(96);
+ assertThat(bucket(converter("", "a,b"), row)).isEqualTo(27);
+ assertThat(bucket(converter("a,b", "a,b"), row)).isEqualTo(27);
+ assertThat(bucket(converter("", ""), row)).isEqualTo(40);
+ assertThat(bucket(converter("a,b,c", ""), row)).isEqualTo(40);
+ assertThat(bucket(converter("", "a,b,c"), row)).isEqualTo(40);
+ }
+
+ private int bucket(SinkRecordConverter converter, RowData row) {
+ int bucket1 = converter.bucket(row);
+ int bucket2 = converter.convert(row).bucket();
+ assertThat(bucket1).isEqualTo(bucket2);
+ return bucket1;
+ }
+
+ private SinkRecordConverter converter(String bk, String pk) {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("a", new IntType()),
+ new RowType.RowField("b", new IntType()),
+ new RowType.RowField("c", new IntType())));
+ List<DataField> fields = TableSchema.newFields(rowType);
+ Map<String, String> options = new HashMap<>();
+ options.put(BUCKET_KEY.key(), bk);
+ TableSchema schema =
+ new TableSchema(
+ 0,
+ fields,
+ TableSchema.currentHighestFieldId(fields),
+ Collections.emptyList(),
+ "".equals(pk) ? Collections.emptyList() : Arrays.asList(pk.split(",")),
+ options,
+ "");
+ return new SinkRecordConverter(100, schema);
+ }
+}