You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/28 02:21:44 UTC
[flink-table-store] branch master updated: [FLINK-30164] Expose BucketComputer from SupportsWrite
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d8eb796f [FLINK-30164] Expose BucketComputer from SupportsWrite
d8eb796f is described below
commit d8eb796f035f35e1ac85ff3f657452dd2a41e644
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Nov 28 10:21:38 2022 +0800
[FLINK-30164] Expose BucketComputer from SupportsWrite
This closes #400.
---
.../store/file/predicate/PredicateFilter.java | 6 ++
.../connector/sink/BucketStreamPartitioner.java | 19 ++--
.../store/connector/sink/FlinkSinkBuilder.java | 6 +-
.../store/connector/sink/StoreWriteOperator.java | 2 +-
.../store/connector/AppendOnlyTableITCase.java | 11 ++-
.../table/store/connector/PredicateITCase.java | 38 ++++++-
.../table/store/file/predicate/BucketSelector.java | 6 +-
.../store/table/AppendOnlyFileStoreTable.java | 4 +-
.../table/ChangelogValueCountFileStoreTable.java | 4 +-
.../table/ChangelogWithKeyFileStoreTable.java | 4 +-
.../flink/table/store/table/FileStoreTable.java | 6 ++
.../flink/table/store/table/SupportsWrite.java | 5 +-
.../table/store/table/sink/BucketComputer.java | 110 +++++++++++++++++++++
.../store/table/sink/SinkRecordConverter.java | 66 ++-----------
.../flink/table/store/table/sink/TableWrite.java | 7 +-
.../table/store/table/sink/TableWriteImpl.java | 10 +-
.../flink/table/store/table/source/TableRead.java | 19 ++++
.../store/table/AppendOnlyFileStoreTableTest.java | 4 +-
.../store/table/sink/SinkRecordConverterTest.java | 4 +-
19 files changed, 228 insertions(+), 103 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
index 2c4f0528..b84b52f3 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
@@ -24,12 +24,18 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
+import java.util.List;
+
/** A {@link java.util.function.Predicate} to filter {@link RowData}. */
public class PredicateFilter implements java.util.function.Predicate<RowData> {
private final RowDataToObjectArrayConverter arrayConverter;
@Nullable private final Predicate predicate;
+ public PredicateFilter(RowType rowType, List<Predicate> predicates) {
+ this(rowType, predicates.isEmpty() ? null : PredicateBuilder.and(predicates));
+ }
+
public PredicateFilter(RowType rowType, @Nullable Predicate predicate) {
this.arrayConverter = new RowDataToObjectArrayConverter(rowType);
this.predicate = predicate;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index cd640c43..bacca02e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -24,31 +24,30 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.BucketComputer;
/** A {@link StreamPartitioner} to partition records by bucket. */
public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
- private final int numBucket;
- private final TableSchema tableSchema;
+ private final TableSchema schema;
- private transient SinkRecordConverter recordConverter;
+ private transient BucketComputer computer;
+ private transient int numberOfChannels;
- public BucketStreamPartitioner(int numBucket, TableSchema tableSchema) {
- this.numBucket = numBucket;
- this.tableSchema = tableSchema;
+ public BucketStreamPartitioner(TableSchema schema) {
+ this.schema = schema;
}
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
- this.recordConverter = new SinkRecordConverter(numBucket, tableSchema);
+ this.computer = new BucketComputer(schema);
+ this.numberOfChannels = numberOfChannels;
}
@Override
public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
- RowData row = record.getInstance().getValue();
- return recordConverter.bucket(row) % numberOfChannels;
+ return computer.bucket(record.getInstance().getValue()) % numberOfChannels;
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 3b0ffbe0..2e77a669 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
@@ -91,10 +90,7 @@ public class FlinkSinkBuilder {
}
public DataStreamSink<?> build() {
- int numBucket = conf.get(CoreOptions.BUCKET);
-
- BucketStreamPartitioner partitioner =
- new BucketStreamPartitioner(numBucket, table.schema());
+ BucketStreamPartitioner partitioner = new BucketStreamPartitioner(table.schema());
PartitionTransformation<RowData> partitioned =
new PartitionTransformation<>(input.getTransformation(), partitioner);
if (parallelism != null) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index 00cb7d62..f3bfcb19 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -162,7 +162,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
if (logSinkFunction != null) {
// write to log store, need to preserve original pk (which includes partition fields)
- SinkRecord logRecord = write.recordConverter().convertToLogSinkRecord(record);
+ SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 6391a00c..cfee8bd2 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -196,10 +196,17 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
testRejectChanges(RowKind.UPDATE_BEFORE);
}
+ @Test
+ public void testComplexType() {
+ batchSql("INSERT INTO complex_table VALUES (1, CAST(NULL AS MAP<INT, INT>))");
+ assertThat(batchSql("SELECT * FROM complex_table")).containsExactly(Row.of(1, null));
+ }
+
@Override
protected List<String> ddl() {
- return Collections.singletonList(
- "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')");
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')",
+ "CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP<INT, INT>) WITH ('write-mode'='append-only')");
}
private void testRejectChanges(RowKind kind) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
index 6eeccf7e..3a73c0bf 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
@@ -30,28 +30,58 @@ public class PredicateITCase extends CatalogITCaseBase {
@Test
public void testPkFilterBucket() throws Exception {
sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '5')");
- innerTest();
+ writeRecords();
+ innerTestSingleField();
+ innerTestAllFields();
}
@Test
public void testNoPkFilterBucket() throws Exception {
sql("CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a')");
- innerTest();
+ writeRecords();
+ innerTestSingleField();
+ innerTestAllFields();
}
@Test
public void testAppendFilterBucket() throws Exception {
sql(
"CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a', 'write-mode'='append-only')");
- innerTest();
+ writeRecords();
+ innerTestSingleField();
+ innerTestAllFields();
}
- private void innerTest() throws Exception {
+ @Test
+ public void testAppendNoBucketKey() throws Exception {
+ sql("CREATE TABLE T (a INT, b INT) WITH ('write-mode'='append-only', 'bucket' = '5')");
+ writeRecords();
+ innerTestSingleField();
+ innerTestAllFields();
+ }
+
+ private void writeRecords() throws Exception {
sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
+ }
+
+ private void innerTestSingleField() throws Exception {
assertThat(sql("SELECT * FROM T WHERE a = 1")).containsExactlyInAnyOrder(Row.of(1, 2));
assertThat(sql("SELECT * FROM T WHERE a = 3")).containsExactlyInAnyOrder(Row.of(3, 4));
assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6));
assertThat(sql("SELECT * FROM T WHERE a = 7")).containsExactlyInAnyOrder(Row.of(7, 8));
assertThat(sql("SELECT * FROM T WHERE a = 9")).containsExactlyInAnyOrder(Row.of(9, 10));
}
+
+ private void innerTestAllFields() throws Exception {
+ assertThat(sql("SELECT * FROM T WHERE a = 1 and b = 2"))
+ .containsExactlyInAnyOrder(Row.of(1, 2));
+ assertThat(sql("SELECT * FROM T WHERE a = 3 and b = 4"))
+ .containsExactlyInAnyOrder(Row.of(3, 4));
+ assertThat(sql("SELECT * FROM T WHERE a = 5 and b = 6"))
+ .containsExactlyInAnyOrder(Row.of(5, 6));
+ assertThat(sql("SELECT * FROM T WHERE a = 7 and b = 8"))
+ .containsExactlyInAnyOrder(Row.of(7, 8));
+ assertThat(sql("SELECT * FROM T WHERE a = 9 and b = 10"))
+ .containsExactlyInAnyOrder(Row.of(9, 10));
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
index c451145b..f242092d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
@@ -72,7 +72,7 @@ public class BucketSelector implements Serializable {
Set<Integer> createBucketSet(int numBucket) {
ImmutableSet.Builder<Integer> builder = new ImmutableSet.Builder<>();
for (int hash : hashCodes) {
- builder.add(SinkRecordConverter.bucket(hash, numBucket));
+ builder.add(BucketComputer.bucket(hash, numBucket));
}
return builder.build();
}
@@ -140,7 +140,7 @@ public class BucketSelector implements Serializable {
private static int hash(List<Object> columns, RowDataSerializer serializer) {
BinaryRowData binaryRow = serializer.toBinaryRow(GenericRowData.of(columns.toArray()));
- return SinkRecordConverter.hashcode(binaryRow);
+ return BucketComputer.hashcode(binaryRow);
}
private static void assembleRows(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index c7e15ef4..0a9e99f6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -105,11 +105,9 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
@Override
public TableWrite newWrite(String commitUser) {
- SinkRecordConverter recordConverter =
- new SinkRecordConverter(store.options().bucket(), tableSchema);
return new TableWriteImpl<>(
store.newWrite(commitUser),
- recordConverter,
+ new SinkRecordConverter(tableSchema),
record -> {
Preconditions.checkState(
record.row().getRowKind() == RowKind.INSERT,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 1c95b8eb..4ae46fc5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -123,12 +123,10 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@Override
public TableWrite newWrite(String commitUser) {
- SinkRecordConverter recordConverter =
- new SinkRecordConverter(store.options().bucket(), tableSchema);
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store.newWrite(commitUser),
- recordConverter,
+ new SinkRecordConverter(tableSchema),
record -> {
switch (record.row().getRowKind()) {
case INSERT:
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 095163e1..2cbcd629 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -197,8 +197,6 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
@Override
public TableWrite newWrite(String commitUser) {
- SinkRecordConverter recordConverter =
- new SinkRecordConverter(store.options().bucket(), tableSchema);
final SequenceGenerator sequenceGenerator =
store.options()
.sequenceField()
@@ -207,7 +205,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store.newWrite(commitUser),
- recordConverter,
+ new SinkRecordConverter(tableSchema),
record -> {
long sequenceNumber =
sequenceGenerator == null
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 2fd56ae6..47ea554c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.types.logical.RowType;
@@ -58,4 +59,9 @@ public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite
@Override
DataTableScan newScan();
+
+ @Override
+ default BucketComputer bucketComputer() {
+ return new BucketComputer(schema());
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
index 696aa3eb..362c9fc7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
@@ -18,11 +18,14 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
/** An interface for {@link Table} write support. */
-public interface SupportsWrite {
+public interface SupportsWrite extends Table {
+
+ BucketComputer bucketComputer();
TableWrite newWrite(String commitUser);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
new file mode 100644
index 00000000..533e9e3a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.util.stream.IntStream;
+
+/** A {@link BucketComputer} to compute bucket by bucket keys or primary keys or whole row. */
+public class BucketComputer {
+
+ private final int numBucket;
+
+ private final Projection<RowData, BinaryRowData> rowProjection;
+ private final Projection<RowData, BinaryRowData> bucketProjection;
+ private final Projection<RowData, BinaryRowData> pkProjection;
+
+ public BucketComputer(TableSchema tableSchema) {
+ this(
+ new CoreOptions(tableSchema.options()).bucket(),
+ tableSchema.logicalRowType(),
+ tableSchema.projection(tableSchema.originalBucketKeys()),
+ tableSchema.projection(tableSchema.trimmedPrimaryKeys()));
+ }
+
+ private BucketComputer(int numBucket, RowType rowType, int[] bucketKeys, int[] primaryKeys) {
+ this.numBucket = numBucket;
+ this.rowProjection =
+ CodeGenUtils.newProjection(
+ rowType, IntStream.range(0, rowType.getFieldCount()).toArray());
+ this.bucketProjection = CodeGenUtils.newProjection(rowType, bucketKeys);
+ this.pkProjection = CodeGenUtils.newProjection(rowType, primaryKeys);
+ }
+
+ private int hashRow(RowData row) {
+ if (row instanceof BinaryRowData) {
+ RowKind rowKind = row.getRowKind();
+ row.setRowKind(RowKind.INSERT);
+ int hash = hashcode((BinaryRowData) row);
+ row.setRowKind(rowKind);
+ return hash;
+ } else {
+ return hashcode(rowProjection.apply(row));
+ }
+ }
+
+ public int bucket(RowData row) {
+ int hashcode = hashBucketKey(row);
+ return bucket(hashcode, numBucket);
+ }
+
+ public int bucket(RowData row, BinaryRowData pk) {
+ int hashcode = hashBucketKey(row, pk);
+ return bucket(hashcode, numBucket);
+ }
+
+ private int hashBucketKey(RowData row) {
+ BinaryRowData bucketKey = bucketProjection.apply(row);
+ if (bucketKey.getArity() == 0) {
+ bucketKey = pkProjection.apply(row);
+ }
+ if (bucketKey.getArity() == 0) {
+ return hashRow(row);
+ }
+ return bucketKey.hashCode();
+ }
+
+ private int hashBucketKey(RowData row, BinaryRowData pk) {
+ BinaryRowData bucketKey = bucketProjection.apply(row);
+ if (bucketKey.getArity() == 0) {
+ bucketKey = pk;
+ }
+ if (bucketKey.getArity() == 0) {
+ return hashRow(row);
+ }
+ return bucketKey.hashCode();
+ }
+
+ public static int hashcode(BinaryRowData rowData) {
+ assert rowData.getRowKind() == RowKind.INSERT;
+ return rowData.hashCode();
+ }
+
+ public static int bucket(int hashcode, int numBucket) {
+ return Math.abs(hashcode % numBucket);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 6d61d6d4..3b68d77f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -24,51 +24,39 @@ import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
import java.util.Arrays;
-import java.util.stream.IntStream;
/** Converter for converting {@link RowData} to {@link SinkRecord}. */
public class SinkRecordConverter {
- private final int numBucket;
-
- private final Projection<RowData, BinaryRowData> allProjection;
+ private final BucketComputer bucketComputer;
private final Projection<RowData, BinaryRowData> partProjection;
- private final Projection<RowData, BinaryRowData> bucketProjection;
-
private final Projection<RowData, BinaryRowData> pkProjection;
@Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
- public SinkRecordConverter(int numBucket, TableSchema tableSchema) {
+ public SinkRecordConverter(TableSchema tableSchema) {
this(
- numBucket,
tableSchema.logicalRowType(),
tableSchema.projection(tableSchema.partitionKeys()),
- tableSchema.projection(tableSchema.originalBucketKeys()),
tableSchema.projection(tableSchema.trimmedPrimaryKeys()),
- tableSchema.projection(tableSchema.primaryKeys()));
+ tableSchema.projection(tableSchema.primaryKeys()),
+ new BucketComputer(tableSchema));
}
private SinkRecordConverter(
- int numBucket,
RowType inputType,
int[] partitions,
- int[] bucketKeys,
int[] primaryKeys,
- int[] logPrimaryKeys) {
- this.numBucket = numBucket;
- this.allProjection =
- CodeGenUtils.newProjection(
- inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
+ int[] logPrimaryKeys,
+ BucketComputer bucketComputer) {
+ this.bucketComputer = bucketComputer;
this.partProjection = CodeGenUtils.newProjection(inputType, partitions);
- this.bucketProjection = CodeGenUtils.newProjection(inputType, bucketKeys);
this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
this.logPkProjection =
Arrays.equals(primaryKeys, logPrimaryKeys)
@@ -79,7 +67,7 @@ public class SinkRecordConverter {
public SinkRecord convert(RowData row) {
BinaryRowData partition = partProjection.apply(row);
BinaryRowData primaryKey = primaryKey(row);
- int bucket = bucket(row, bucketKey(row, primaryKey));
+ int bucket = bucketComputer.bucket(row, primaryKey);
return new SinkRecord(partition, bucket, primaryKey, row);
}
@@ -92,51 +80,15 @@ public class SinkRecordConverter {
}
public int bucket(RowData row) {
- return bucket(row, bucketKey(row));
+ return bucketComputer.bucket(row);
}
private BinaryRowData primaryKey(RowData row) {
return pkProjection.apply(row);
}
- private BinaryRowData bucketKey(RowData row) {
- BinaryRowData bucketKey = bucketProjection.apply(row);
- return bucketKey.getArity() == 0 ? pkProjection.apply(row) : bucketKey;
- }
-
- private BinaryRowData bucketKey(RowData row, BinaryRowData primaryKey) {
- BinaryRowData bucketKey = bucketProjection.apply(row);
- return bucketKey.getArity() == 0 ? primaryKey : bucketKey;
- }
-
private BinaryRowData logPrimaryKey(RowData row) {
assert logPkProjection != null;
return logPkProjection.apply(row);
}
-
- private int bucket(RowData row, BinaryRowData bucketKey) {
- int hash = bucketKey.getArity() == 0 ? hashRow(row) : hashcode(bucketKey);
- return bucket(hash, numBucket);
- }
-
- private int hashRow(RowData row) {
- if (row instanceof BinaryRowData) {
- RowKind rowKind = row.getRowKind();
- row.setRowKind(RowKind.INSERT);
- int hash = hashcode((BinaryRowData) row);
- row.setRowKind(rowKind);
- return hash;
- } else {
- return hashcode(allProjection.apply(row));
- }
- }
-
- public static int hashcode(BinaryRowData rowData) {
- assert rowData.getRowKind() == RowKind.INSERT;
- return rowData.hashCode();
- }
-
- public static int bucket(int hashcode, int numBucket) {
- return Math.abs(hashcode % numBucket);
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 84683db7..6e449d14 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -28,16 +28,17 @@ import java.util.List;
* An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to
* provide {@link RowData} writing.
*/
-public interface TableWrite {
+public interface TableWrite extends AutoCloseable {
TableWrite withOverwrite(boolean overwrite);
TableWrite withIOManager(IOManager ioManager);
- SinkRecordConverter recordConverter();
-
SinkRecord write(RowData rowData) throws Exception;
+ /** Log record need to preserve original pk (which includes partition fields). */
+ SinkRecord toLogRecord(SinkRecord record);
+
void compact(BinaryRowData partition, int bucket) throws Exception;
List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier) throws Exception;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index bfbebc17..7cf96aea 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -57,11 +57,6 @@ public class TableWriteImpl<T> implements TableWrite {
return this;
}
- @Override
- public SinkRecordConverter recordConverter() {
- return recordConverter;
- }
-
@Override
public SinkRecord write(RowData rowData) throws Exception {
SinkRecord record = recordConverter.convert(rowData);
@@ -69,6 +64,11 @@ public class TableWriteImpl<T> implements TableWrite {
return record;
}
+ @Override
+ public SinkRecord toLogRecord(SinkRecord record) {
+ return recordConverter.convertToLogSinkRecord(record);
+ }
+
@Override
public void compact(BinaryRowData partition, int bucket) throws Exception {
write.compact(partition, bucket);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index 440e3d2d..92b11f13 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -19,16 +19,27 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.utils.RecordReader;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
public interface TableRead {
+ default TableRead withFilter(List<Predicate> predicates) {
+ if (predicates == null || predicates.isEmpty()) {
+ return this;
+ }
+ return withFilter(PredicateBuilder.and(predicates));
+ }
+
TableRead withFilter(Predicate predicate);
default TableRead withProjection(int[] projection) {
@@ -40,4 +51,12 @@ public interface TableRead {
TableRead withProjection(int[][] projection);
RecordReader<RowData> createReader(Split split) throws IOException;
+
+ default RecordReader<RowData> createReader(List<Split> splits) throws IOException {
+ List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new ArrayList<>();
+ for (Split split : splits) {
+ readers.add(() -> createReader(split));
+ }
+ return ConcatRecordReader.create(readers);
+ }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 563f290d..7e4ea531 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -47,8 +47,8 @@ import java.util.Random;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import static org.apache.flink.table.store.table.sink.SinkRecordConverter.bucket;
-import static org.apache.flink.table.store.table.sink.SinkRecordConverter.hashcode;
+import static org.apache.flink.table.store.table.sink.BucketComputer.bucket;
+import static org.apache.flink.table.store.table.sink.BucketComputer.hashcode;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link AppendOnlyFileStoreTable}. */
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
index 827ef906..fceee85b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -85,6 +86,7 @@ public class SinkRecordConverterTest {
List<DataField> fields = TableSchema.newFields(rowType);
Map<String, String> options = new HashMap<>();
options.put(BUCKET_KEY.key(), bk);
+ options.put(BUCKET.key(), "100");
TableSchema schema =
new TableSchema(
0,
@@ -96,6 +98,6 @@ public class SinkRecordConverterTest {
"".equals(pk) ? Collections.emptyList() : Arrays.asList(pk.split(",")),
options,
"");
- return new SinkRecordConverter(100, schema);
+ return new SinkRecordConverter(schema);
}
}