You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/28 02:21:44 UTC

[flink-table-store] branch master updated: [FLINK-30164] Expose BucketComputer from SupportsWrite

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new d8eb796f [FLINK-30164] Expose BucketComputer from SupportsWrite
d8eb796f is described below

commit d8eb796f035f35e1ac85ff3f657452dd2a41e644
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Nov 28 10:21:38 2022 +0800

    [FLINK-30164] Expose BucketComputer from SupportsWrite
    
    This closes #400.
---
 .../store/file/predicate/PredicateFilter.java      |   6 ++
 .../connector/sink/BucketStreamPartitioner.java    |  19 ++--
 .../store/connector/sink/FlinkSinkBuilder.java     |   6 +-
 .../store/connector/sink/StoreWriteOperator.java   |   2 +-
 .../store/connector/AppendOnlyTableITCase.java     |  11 ++-
 .../table/store/connector/PredicateITCase.java     |  38 ++++++-
 .../table/store/file/predicate/BucketSelector.java |   6 +-
 .../store/table/AppendOnlyFileStoreTable.java      |   4 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   4 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   4 +-
 .../flink/table/store/table/FileStoreTable.java    |   6 ++
 .../flink/table/store/table/SupportsWrite.java     |   5 +-
 .../table/store/table/sink/BucketComputer.java     | 110 +++++++++++++++++++++
 .../store/table/sink/SinkRecordConverter.java      |  66 ++-----------
 .../flink/table/store/table/sink/TableWrite.java   |   7 +-
 .../table/store/table/sink/TableWriteImpl.java     |  10 +-
 .../flink/table/store/table/source/TableRead.java  |  19 ++++
 .../store/table/AppendOnlyFileStoreTableTest.java  |   4 +-
 .../store/table/sink/SinkRecordConverterTest.java  |   4 +-
 19 files changed, 228 insertions(+), 103 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
index 2c4f0528..b84b52f3 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
@@ -24,12 +24,18 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 /** A {@link java.util.function.Predicate} to filter {@link RowData}. */
 public class PredicateFilter implements java.util.function.Predicate<RowData> {
 
     private final RowDataToObjectArrayConverter arrayConverter;
     @Nullable private final Predicate predicate;
 
+    public PredicateFilter(RowType rowType, List<Predicate> predicates) {
+        this(rowType, predicates.isEmpty() ? null : PredicateBuilder.and(predicates));
+    }
+
     public PredicateFilter(RowType rowType, @Nullable Predicate predicate) {
         this.arrayConverter = new RowDataToObjectArrayConverter(rowType);
         this.predicate = predicate;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index cd640c43..bacca02e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -24,31 +24,30 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.BucketComputer;
 
 /** A {@link StreamPartitioner} to partition records by bucket. */
 public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
 
-    private final int numBucket;
-    private final TableSchema tableSchema;
+    private final TableSchema schema;
 
-    private transient SinkRecordConverter recordConverter;
+    private transient BucketComputer computer;
+    private transient int numberOfChannels;
 
-    public BucketStreamPartitioner(int numBucket, TableSchema tableSchema) {
-        this.numBucket = numBucket;
-        this.tableSchema = tableSchema;
+    public BucketStreamPartitioner(TableSchema schema) {
+        this.schema = schema;
     }
 
     @Override
     public void setup(int numberOfChannels) {
         super.setup(numberOfChannels);
-        this.recordConverter = new SinkRecordConverter(numBucket, tableSchema);
+        this.computer = new BucketComputer(schema);
+        this.numberOfChannels = numberOfChannels;
     }
 
     @Override
     public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
-        RowData row = record.getInstance().getValue();
-        return recordConverter.bucket(row) % numberOfChannels;
+        return computer.bucket(record.getInstance().getValue()) % numberOfChannels;
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 3b0ffbe0..2e77a669 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
@@ -91,10 +90,7 @@ public class FlinkSinkBuilder {
     }
 
     public DataStreamSink<?> build() {
-        int numBucket = conf.get(CoreOptions.BUCKET);
-
-        BucketStreamPartitioner partitioner =
-                new BucketStreamPartitioner(numBucket, table.schema());
+        BucketStreamPartitioner partitioner = new BucketStreamPartitioner(table.schema());
         PartitionTransformation<RowData> partitioned =
                 new PartitionTransformation<>(input.getTransformation(), partitioner);
         if (parallelism != null) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index 00cb7d62..f3bfcb19 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -162,7 +162,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
 
         if (logSinkFunction != null) {
             // write to log store, need to preserve original pk (which includes partition fields)
-            SinkRecord logRecord = write.recordConverter().convertToLogSinkRecord(record);
+            SinkRecord logRecord = write.toLogRecord(record);
             logSinkFunction.invoke(logRecord, sinkContext);
         }
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 6391a00c..cfee8bd2 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -196,10 +196,17 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
         testRejectChanges(RowKind.UPDATE_BEFORE);
     }
 
+    @Test
+    public void testComplexType() {
+        batchSql("INSERT INTO complex_table VALUES (1, CAST(NULL AS MAP<INT, INT>))");
+        assertThat(batchSql("SELECT * FROM complex_table")).containsExactly(Row.of(1, null));
+    }
+
     @Override
     protected List<String> ddl() {
-        return Collections.singletonList(
-                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')");
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')",
+                "CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP<INT, INT>) WITH ('write-mode'='append-only')");
     }
 
     private void testRejectChanges(RowKind kind) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
index 6eeccf7e..3a73c0bf 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
@@ -30,28 +30,58 @@ public class PredicateITCase extends CatalogITCaseBase {
     @Test
     public void testPkFilterBucket() throws Exception {
         sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '5')");
-        innerTest();
+        writeRecords();
+        innerTestSingleField();
+        innerTestAllFields();
     }
 
     @Test
     public void testNoPkFilterBucket() throws Exception {
         sql("CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a')");
-        innerTest();
+        writeRecords();
+        innerTestSingleField();
+        innerTestAllFields();
     }
 
     @Test
     public void testAppendFilterBucket() throws Exception {
         sql(
                 "CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a', 'write-mode'='append-only')");
-        innerTest();
+        writeRecords();
+        innerTestSingleField();
+        innerTestAllFields();
     }
 
-    private void innerTest() throws Exception {
+    @Test
+    public void testAppendNoBucketKey() throws Exception {
+        sql("CREATE TABLE T (a INT, b INT) WITH ('write-mode'='append-only', 'bucket' = '5')");
+        writeRecords();
+        innerTestSingleField();
+        innerTestAllFields();
+    }
+
+    private void writeRecords() throws Exception {
         sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
+    }
+
+    private void innerTestSingleField() throws Exception {
         assertThat(sql("SELECT * FROM T WHERE a = 1")).containsExactlyInAnyOrder(Row.of(1, 2));
         assertThat(sql("SELECT * FROM T WHERE a = 3")).containsExactlyInAnyOrder(Row.of(3, 4));
         assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6));
         assertThat(sql("SELECT * FROM T WHERE a = 7")).containsExactlyInAnyOrder(Row.of(7, 8));
         assertThat(sql("SELECT * FROM T WHERE a = 9")).containsExactlyInAnyOrder(Row.of(9, 10));
     }
+
+    private void innerTestAllFields() throws Exception {
+        assertThat(sql("SELECT * FROM T WHERE a = 1 and b = 2"))
+                .containsExactlyInAnyOrder(Row.of(1, 2));
+        assertThat(sql("SELECT * FROM T WHERE a = 3 and b = 4"))
+                .containsExactlyInAnyOrder(Row.of(3, 4));
+        assertThat(sql("SELECT * FROM T WHERE a = 5 and b = 6"))
+                .containsExactlyInAnyOrder(Row.of(5, 6));
+        assertThat(sql("SELECT * FROM T WHERE a = 7 and b = 8"))
+                .containsExactlyInAnyOrder(Row.of(7, 8));
+        assertThat(sql("SELECT * FROM T WHERE a = 9 and b = 10"))
+                .containsExactlyInAnyOrder(Row.of(9, 10));
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
index c451145b..f242092d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.BucketComputer;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
@@ -72,7 +72,7 @@ public class BucketSelector implements Serializable {
     Set<Integer> createBucketSet(int numBucket) {
         ImmutableSet.Builder<Integer> builder = new ImmutableSet.Builder<>();
         for (int hash : hashCodes) {
-            builder.add(SinkRecordConverter.bucket(hash, numBucket));
+            builder.add(BucketComputer.bucket(hash, numBucket));
         }
         return builder.build();
     }
@@ -140,7 +140,7 @@ public class BucketSelector implements Serializable {
 
     private static int hash(List<Object> columns, RowDataSerializer serializer) {
         BinaryRowData binaryRow = serializer.toBinaryRow(GenericRowData.of(columns.toArray()));
-        return SinkRecordConverter.hashcode(binaryRow);
+        return BucketComputer.hashcode(binaryRow);
     }
 
     private static void assembleRows(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index c7e15ef4..0a9e99f6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -105,11 +105,9 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableWrite newWrite(String commitUser) {
-        SinkRecordConverter recordConverter =
-                new SinkRecordConverter(store.options().bucket(), tableSchema);
         return new TableWriteImpl<>(
                 store.newWrite(commitUser),
-                recordConverter,
+                new SinkRecordConverter(tableSchema),
                 record -> {
                     Preconditions.checkState(
                             record.row().getRowKind() == RowKind.INSERT,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 1c95b8eb..4ae46fc5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -123,12 +123,10 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableWrite newWrite(String commitUser) {
-        SinkRecordConverter recordConverter =
-                new SinkRecordConverter(store.options().bucket(), tableSchema);
         final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store.newWrite(commitUser),
-                recordConverter,
+                new SinkRecordConverter(tableSchema),
                 record -> {
                     switch (record.row().getRowKind()) {
                         case INSERT:
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 095163e1..2cbcd629 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -197,8 +197,6 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableWrite newWrite(String commitUser) {
-        SinkRecordConverter recordConverter =
-                new SinkRecordConverter(store.options().bucket(), tableSchema);
         final SequenceGenerator sequenceGenerator =
                 store.options()
                         .sequenceField()
@@ -207,7 +205,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
         final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store.newWrite(commitUser),
-                recordConverter,
+                new SinkRecordConverter(tableSchema),
                 record -> {
                     long sequenceNumber =
                             sequenceGenerator == null
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 2fd56ae6..47ea554c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.sink.BucketComputer;
 import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -58,4 +59,9 @@ public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite
 
     @Override
     DataTableScan newScan();
+
+    @Override
+    default BucketComputer bucketComputer() {
+        return new BucketComputer(schema());
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
index 696aa3eb..362c9fc7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.table.store.table.sink.BucketComputer;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 
 /** An interface for {@link Table} write support. */
-public interface SupportsWrite {
+public interface SupportsWrite extends Table {
+
+    BucketComputer bucketComputer();
 
     TableWrite newWrite(String commitUser);
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
new file mode 100644
index 00000000..533e9e3a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.util.stream.IntStream;
+
+/** A {@link BucketComputer} to compute bucket by bucket keys or primary keys or whole row. */
+public class BucketComputer {
+
+    private final int numBucket;
+
+    private final Projection<RowData, BinaryRowData> rowProjection;
+    private final Projection<RowData, BinaryRowData> bucketProjection;
+    private final Projection<RowData, BinaryRowData> pkProjection;
+
+    public BucketComputer(TableSchema tableSchema) {
+        this(
+                new CoreOptions(tableSchema.options()).bucket(),
+                tableSchema.logicalRowType(),
+                tableSchema.projection(tableSchema.originalBucketKeys()),
+                tableSchema.projection(tableSchema.trimmedPrimaryKeys()));
+    }
+
+    private BucketComputer(int numBucket, RowType rowType, int[] bucketKeys, int[] primaryKeys) {
+        this.numBucket = numBucket;
+        this.rowProjection =
+                CodeGenUtils.newProjection(
+                        rowType, IntStream.range(0, rowType.getFieldCount()).toArray());
+        this.bucketProjection = CodeGenUtils.newProjection(rowType, bucketKeys);
+        this.pkProjection = CodeGenUtils.newProjection(rowType, primaryKeys);
+    }
+
+    private int hashRow(RowData row) {
+        if (row instanceof BinaryRowData) {
+            RowKind rowKind = row.getRowKind();
+            row.setRowKind(RowKind.INSERT);
+            int hash = hashcode((BinaryRowData) row);
+            row.setRowKind(rowKind);
+            return hash;
+        } else {
+            return hashcode(rowProjection.apply(row));
+        }
+    }
+
+    public int bucket(RowData row) {
+        int hashcode = hashBucketKey(row);
+        return bucket(hashcode, numBucket);
+    }
+
+    public int bucket(RowData row, BinaryRowData pk) {
+        int hashcode = hashBucketKey(row, pk);
+        return bucket(hashcode, numBucket);
+    }
+
+    private int hashBucketKey(RowData row) {
+        BinaryRowData bucketKey = bucketProjection.apply(row);
+        if (bucketKey.getArity() == 0) {
+            bucketKey = pkProjection.apply(row);
+        }
+        if (bucketKey.getArity() == 0) {
+            return hashRow(row);
+        }
+        return bucketKey.hashCode();
+    }
+
+    private int hashBucketKey(RowData row, BinaryRowData pk) {
+        BinaryRowData bucketKey = bucketProjection.apply(row);
+        if (bucketKey.getArity() == 0) {
+            bucketKey = pk;
+        }
+        if (bucketKey.getArity() == 0) {
+            return hashRow(row);
+        }
+        return bucketKey.hashCode();
+    }
+
+    public static int hashcode(BinaryRowData rowData) {
+        assert rowData.getRowKind() == RowKind.INSERT;
+        return rowData.hashCode();
+    }
+
+    public static int bucket(int hashcode, int numBucket) {
+        return Math.abs(hashcode % numBucket);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 6d61d6d4..3b68d77f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -24,51 +24,39 @@ import org.apache.flink.table.runtime.generated.Projection;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
-import java.util.stream.IntStream;
 
 /** Converter for converting {@link RowData} to {@link SinkRecord}. */
 public class SinkRecordConverter {
 
-    private final int numBucket;
-
-    private final Projection<RowData, BinaryRowData> allProjection;
+    private final BucketComputer bucketComputer;
 
     private final Projection<RowData, BinaryRowData> partProjection;
 
-    private final Projection<RowData, BinaryRowData> bucketProjection;
-
     private final Projection<RowData, BinaryRowData> pkProjection;
 
     @Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
 
-    public SinkRecordConverter(int numBucket, TableSchema tableSchema) {
+    public SinkRecordConverter(TableSchema tableSchema) {
         this(
-                numBucket,
                 tableSchema.logicalRowType(),
                 tableSchema.projection(tableSchema.partitionKeys()),
-                tableSchema.projection(tableSchema.originalBucketKeys()),
                 tableSchema.projection(tableSchema.trimmedPrimaryKeys()),
-                tableSchema.projection(tableSchema.primaryKeys()));
+                tableSchema.projection(tableSchema.primaryKeys()),
+                new BucketComputer(tableSchema));
     }
 
     private SinkRecordConverter(
-            int numBucket,
             RowType inputType,
             int[] partitions,
-            int[] bucketKeys,
             int[] primaryKeys,
-            int[] logPrimaryKeys) {
-        this.numBucket = numBucket;
-        this.allProjection =
-                CodeGenUtils.newProjection(
-                        inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
+            int[] logPrimaryKeys,
+            BucketComputer bucketComputer) {
+        this.bucketComputer = bucketComputer;
         this.partProjection = CodeGenUtils.newProjection(inputType, partitions);
-        this.bucketProjection = CodeGenUtils.newProjection(inputType, bucketKeys);
         this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
         this.logPkProjection =
                 Arrays.equals(primaryKeys, logPrimaryKeys)
@@ -79,7 +67,7 @@ public class SinkRecordConverter {
     public SinkRecord convert(RowData row) {
         BinaryRowData partition = partProjection.apply(row);
         BinaryRowData primaryKey = primaryKey(row);
-        int bucket = bucket(row, bucketKey(row, primaryKey));
+        int bucket = bucketComputer.bucket(row, primaryKey);
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
@@ -92,51 +80,15 @@ public class SinkRecordConverter {
     }
 
     public int bucket(RowData row) {
-        return bucket(row, bucketKey(row));
+        return bucketComputer.bucket(row);
     }
 
     private BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
-    private BinaryRowData bucketKey(RowData row) {
-        BinaryRowData bucketKey = bucketProjection.apply(row);
-        return bucketKey.getArity() == 0 ? pkProjection.apply(row) : bucketKey;
-    }
-
-    private BinaryRowData bucketKey(RowData row, BinaryRowData primaryKey) {
-        BinaryRowData bucketKey = bucketProjection.apply(row);
-        return bucketKey.getArity() == 0 ? primaryKey : bucketKey;
-    }
-
     private BinaryRowData logPrimaryKey(RowData row) {
         assert logPkProjection != null;
         return logPkProjection.apply(row);
     }
-
-    private int bucket(RowData row, BinaryRowData bucketKey) {
-        int hash = bucketKey.getArity() == 0 ? hashRow(row) : hashcode(bucketKey);
-        return bucket(hash, numBucket);
-    }
-
-    private int hashRow(RowData row) {
-        if (row instanceof BinaryRowData) {
-            RowKind rowKind = row.getRowKind();
-            row.setRowKind(RowKind.INSERT);
-            int hash = hashcode((BinaryRowData) row);
-            row.setRowKind(rowKind);
-            return hash;
-        } else {
-            return hashcode(allProjection.apply(row));
-        }
-    }
-
-    public static int hashcode(BinaryRowData rowData) {
-        assert rowData.getRowKind() == RowKind.INSERT;
-        return rowData.hashCode();
-    }
-
-    public static int bucket(int hashcode, int numBucket) {
-        return Math.abs(hashcode % numBucket);
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 84683db7..6e449d14 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -28,16 +28,17 @@ import java.util.List;
  * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to
  * provide {@link RowData} writing.
  */
-public interface TableWrite {
+public interface TableWrite extends AutoCloseable {
 
     TableWrite withOverwrite(boolean overwrite);
 
     TableWrite withIOManager(IOManager ioManager);
 
-    SinkRecordConverter recordConverter();
-
     SinkRecord write(RowData rowData) throws Exception;
 
+    /** Log record need to preserve original pk (which includes partition fields). */
+    SinkRecord toLogRecord(SinkRecord record);
+
     void compact(BinaryRowData partition, int bucket) throws Exception;
 
     List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier) throws Exception;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index bfbebc17..7cf96aea 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -57,11 +57,6 @@ public class TableWriteImpl<T> implements TableWrite {
         return this;
     }
 
-    @Override
-    public SinkRecordConverter recordConverter() {
-        return recordConverter;
-    }
-
     @Override
     public SinkRecord write(RowData rowData) throws Exception {
         SinkRecord record = recordConverter.convert(rowData);
@@ -69,6 +64,11 @@ public class TableWriteImpl<T> implements TableWrite {
         return record;
     }
 
+    @Override
+    public SinkRecord toLogRecord(SinkRecord record) {
+        return recordConverter.convertToLogSinkRecord(record);
+    }
+
     @Override
     public void compact(BinaryRowData partition, int bucket) throws Exception {
         write.compact(partition, bucket);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index 440e3d2d..92b11f13 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -19,16 +19,27 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.utils.RecordReader;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 /** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
 public interface TableRead {
 
+    default TableRead withFilter(List<Predicate> predicates) {
+        if (predicates == null || predicates.isEmpty()) {
+            return this;
+        }
+        return withFilter(PredicateBuilder.and(predicates));
+    }
+
     TableRead withFilter(Predicate predicate);
 
     default TableRead withProjection(int[] projection) {
@@ -40,4 +51,12 @@ public interface TableRead {
     TableRead withProjection(int[][] projection);
 
     RecordReader<RowData> createReader(Split split) throws IOException;
+
+    default RecordReader<RowData> createReader(List<Split> splits) throws IOException {
+        List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new ArrayList<>();
+        for (Split split : splits) {
+            readers.add(() -> createReader(split));
+        }
+        return ConcatRecordReader.create(readers);
+    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 563f290d..7e4ea531 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -47,8 +47,8 @@ import java.util.Random;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.store.table.sink.SinkRecordConverter.bucket;
-import static org.apache.flink.table.store.table.sink.SinkRecordConverter.hashcode;
+import static org.apache.flink.table.store.table.sink.BucketComputer.bucket;
+import static org.apache.flink.table.store.table.sink.BucketComputer.hashcode;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link AppendOnlyFileStoreTable}. */
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
index 827ef906..fceee85b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -85,6 +86,7 @@ public class SinkRecordConverterTest {
         List<DataField> fields = TableSchema.newFields(rowType);
         Map<String, String> options = new HashMap<>();
         options.put(BUCKET_KEY.key(), bk);
+        options.put(BUCKET.key(), "100");
         TableSchema schema =
                 new TableSchema(
                         0,
@@ -96,6 +98,6 @@ public class SinkRecordConverterTest {
                         "".equals(pk) ? Collections.emptyList() : Arrays.asList(pk.split(",")),
                         options,
                         "");
-        return new SinkRecordConverter(100, schema);
+        return new SinkRecordConverter(schema);
     }
 }