You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/01/05 15:31:38 UTC

(incubator-paimon) 01/04: [flink] Fix inconsistency problem when lookup join table with sequence field

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit ddd717ebd3e846609653337d17ce41de8bd8b46f
Author: Aitozi <10...@qq.com>
AuthorDate: Wed Jan 3 09:58:46 2024 +0800

    [flink] Fix inconsistency problem when lookup join table with sequence field
    
    This closes #2622
---
 .../paimon/crosspartition/IndexBootstrap.java      |   2 +-
 .../apache/paimon/io/SplitsParallelReadUtil.java   |   9 +-
 .../paimon/table/source/KeyValueTableRead.java     |   4 +
 .../flink/lookup/FileStoreLookupFunction.java      |   8 +-
 .../apache/paimon/flink/lookup/LookupTable.java    |  18 ++-
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |  30 ++++-
 .../flink/lookup/SecondaryIndexLookupTable.java    |  34 ++++-
 .../paimon/flink/lookup/TableStreamingReader.java  |  60 ++++++++-
 .../org/apache/paimon/flink/LookupJoinITCase.java  |  33 +++++
 .../paimon/flink/lookup/LookupTableTest.java       | 150 +++++++++++++++++----
 10 files changed, 300 insertions(+), 48 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index 465955e71..260af9a9a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -110,7 +110,7 @@ public class IndexBootstrap implements Serializable {
 
         return parallelExecute(
                 TypeUtils.project(rowType, keyProjection),
-                readBuilder,
+                s -> readBuilder.newRead().createReader(s),
                 splits,
                 options.pageSize(),
                 options.crossPartitionUpsertBootstrapParallelism(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
index 164d0f2f7..775945735 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
@@ -21,9 +21,9 @@ package org.apache.paimon.io;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FunctionWithException;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParallelExecution;
 import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
@@ -45,7 +45,7 @@ public class SplitsParallelReadUtil {
 
     public static RecordReader<InternalRow> parallelExecute(
             RowType projectedType,
-            ReadBuilder readBuilder,
+            FunctionWithException<Split, RecordReader<InternalRow>, IOException> readBuilder,
             List<Split> splits,
             int pageSize,
             int parallelism) {
@@ -61,7 +61,7 @@ public class SplitsParallelReadUtil {
 
     public static <EXTRA> RecordReader<InternalRow> parallelExecute(
             RowType projectedType,
-            ReadBuilder readBuilder,
+            FunctionWithException<Split, RecordReader<InternalRow>, IOException> readBuilder,
             List<Split> splits,
             int pageSize,
             int parallelism,
@@ -72,8 +72,7 @@ public class SplitsParallelReadUtil {
             suppliers.add(
                     () -> {
                         try {
-                            RecordReader<InternalRow> reader =
-                                    readBuilder.newRead().createReader(split);
+                            RecordReader<InternalRow> reader = readBuilder.apply(split);
                             return Pair.of(reader, extraFunction.apply(split));
                         } catch (IOException e) {
                             throw new UncheckedIOException(e);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index aaebc1532..17735b813 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -53,6 +53,10 @@ public abstract class KeyValueTableRead extends AbstractDataTableRead<KeyValue>
         return new RowDataRecordReader(read.createReader((DataSplit) split));
     }
 
+    public final RecordReader<KeyValue> kvReader(Split split) throws IOException {
+        return read.createReader((DataSplit) split);
+    }
+
     protected abstract RecordReader.RecordIterator<InternalRow> rowDataRecordIteratorFromKv(
             RecordReader.RecordIterator<KeyValue> kvRecordIterator);
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 56c866993..2359204ef 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -94,6 +94,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
     private transient long nextLoadTime;
     private transient TableStreamingReader streamingReader;
 
+    private final boolean sequenceFieldEnabled;
+
     public FileStoreLookupFunction(
             Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
         TableScanUtils.streamingReadingValidate(table);
@@ -119,6 +121,9 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
         }
 
         this.predicate = predicate;
+        this.sequenceFieldEnabled =
+                table.primaryKeys().size() > 0
+                        && new CoreOptions(table.options()).sequenceField().isPresent();
     }
 
     public void open(FunctionContext context) throws Exception {
@@ -151,7 +156,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
                         table.primaryKeys(),
                         joinKeys,
                         recordFilter,
-                        options.get(LOOKUP_CACHE_ROWS));
+                        options.get(LOOKUP_CACHE_ROWS),
+                        sequenceFieldEnabled);
         this.nextLoadTime = -1;
         this.streamingReader = new TableStreamingReader(table, projection, this.predicate);
         bulkLoad(options);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index 00cc6e3f2..6dae92efd 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -50,7 +50,8 @@ public interface LookupTable {
             List<String> primaryKey,
             List<String> joinKey,
             Predicate<InternalRow> recordFilter,
-            long lruCacheSize)
+            long lruCacheSize,
+            boolean sequenceFieldEnabled)
             throws IOException {
         if (primaryKey.isEmpty()) {
             return new NoPrimaryKeyLookupTable(
@@ -58,10 +59,21 @@ public interface LookupTable {
         } else {
             if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
                 return new PrimaryKeyLookupTable(
-                        stateFactory, rowType, joinKey, recordFilter, lruCacheSize);
+                        stateFactory,
+                        rowType,
+                        joinKey,
+                        recordFilter,
+                        lruCacheSize,
+                        sequenceFieldEnabled);
             } else {
                 return new SecondaryIndexLookupTable(
-                        stateFactory, rowType, primaryKey, joinKey, recordFilter, lruCacheSize);
+                        stateFactory,
+                        rowType,
+                        primaryKey,
+                        joinKey,
+                        recordFilter,
+                        lruCacheSize,
+                        sequenceFieldEnabled);
             }
         }
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 7234ed8e9..dfeb29684 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -23,6 +23,8 @@ import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.lookup.BulkLoader;
 import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.lookup.RocksDBValueState;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyProjectedRow;
@@ -44,26 +46,41 @@ public class PrimaryKeyLookupTable implements LookupTable {
     protected int[] primaryKeyMapping;
 
     protected final KeyProjectedRow primaryKey;
+    protected final boolean sequenceFieldEnabled;
+
+    protected final int sequenceIndex;
 
     public PrimaryKeyLookupTable(
             RocksDBStateFactory stateFactory,
             RowType rowType,
             List<String> primaryKey,
             Predicate<InternalRow> recordFilter,
-            long lruCacheSize)
+            long lruCacheSize,
+            boolean sequenceFieldEnabled)
             throws IOException {
         List<String> fieldNames = rowType.getFieldNames();
         this.primaryKeyMapping = primaryKey.stream().mapToInt(fieldNames::indexOf).toArray();
         this.primaryKey = new KeyProjectedRow(primaryKeyMapping);
+        this.sequenceFieldEnabled = sequenceFieldEnabled;
+        // append sequence number at last column when sequence field is attached.
+        RowType adjustedRowType = appendSequenceNumber(rowType);
+        this.sequenceIndex = adjustedRowType.getFieldCount() - 1;
+
         this.tableState =
                 stateFactory.valueState(
                         "table",
                         InternalSerializers.create(TypeUtils.project(rowType, primaryKeyMapping)),
-                        InternalSerializers.create(rowType),
+                        InternalSerializers.create(adjustedRowType),
                         lruCacheSize);
         this.recordFilter = recordFilter;
     }
 
+    public static RowType appendSequenceNumber(RowType rowType) {
+        List<DataType> types = rowType.getFieldTypes();
+        types.add(DataTypes.BIGINT());
+        return RowType.of(types.toArray(new DataType[0]));
+    }
+
     @Override
     public List<InternalRow> get(InternalRow key) throws IOException {
         InternalRow value = tableState.get(key);
@@ -75,6 +92,15 @@ public class PrimaryKeyLookupTable implements LookupTable {
         while (incremental.hasNext()) {
             InternalRow row = incremental.next();
             primaryKey.replaceRow(row);
+            if (sequenceFieldEnabled) {
+                InternalRow previous = tableState.get(primaryKey);
+                // only update the kv when the new value's sequence number is higher.
+                if (previous != null
+                        && previous.getLong(sequenceIndex) > row.getLong(sequenceIndex)) {
+                    continue;
+                }
+            }
+
             if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
                 if (recordFilter.test(row)) {
                     tableState.put(primaryKey, row);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index a813df0dc..cb99286e8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -46,9 +46,16 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
             List<String> primaryKey,
             List<String> secKey,
             Predicate<InternalRow> recordFilter,
-            long lruCacheSize)
+            long lruCacheSize,
+            boolean sequenceFieldEnabled)
             throws IOException {
-        super(stateFactory, rowType, primaryKey, recordFilter, lruCacheSize / 2);
+        super(
+                stateFactory,
+                rowType,
+                primaryKey,
+                recordFilter,
+                lruCacheSize / 2,
+                sequenceFieldEnabled);
         List<String> fieldNames = rowType.getFieldNames();
         int[] secKeyMapping = secKey.stream().mapToInt(fieldNames::indexOf).toArray();
         this.secKeyRow = new KeyProjectedRow(secKeyMapping);
@@ -65,9 +72,9 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
         List<InternalRow> pks = indexState.get(key);
         List<InternalRow> values = new ArrayList<>(pks.size());
         for (InternalRow pk : pks) {
-            InternalRow value = tableState.get(pk);
-            if (value != null) {
-                values.add(value);
+            InternalRow row = tableState.get(pk);
+            if (row != null) {
+                values.add(row);
             }
         }
         return values;
@@ -78,8 +85,23 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
         while (incremental.hasNext()) {
             InternalRow row = incremental.next();
             primaryKey.replaceRow(row);
+
+            boolean previousFetched = false;
+            InternalRow previous = null;
+            if (sequenceFieldEnabled) {
+                previous = tableState.get(primaryKey);
+                previousFetched = true;
+                // only update the kv when the new value's sequence number is higher.
+                if (previous != null
+                        && previous.getLong(sequenceIndex) > row.getLong(sequenceIndex)) {
+                    continue;
+                }
+            }
+
             if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
-                InternalRow previous = tableState.get(primaryKey);
+                if (!previousFetched) {
+                    previous = tableState.get(primaryKey);
+                }
                 if (previous != null) {
                     indexState.retract(secKeyRow.replaceRow(previous), primaryKey);
                 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
index c737458d7..d1befa6a6 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
@@ -19,7 +19,9 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.io.SplitsParallelReadUtil;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.options.Options;
@@ -28,11 +30,14 @@ import org.apache.paimon.predicate.PredicateFilter;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FunctionWithException;
 import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -58,17 +63,21 @@ public class TableStreamingReader {
     @Nullable private final PredicateFilter recordFilter;
     private final StreamTableScan scan;
 
+    private final boolean sequenceFieldEnabled;
+
     public TableStreamingReader(Table table, int[] projection, @Nullable Predicate predicate) {
         this.table = table;
         this.projection = projection;
-        if (CoreOptions.fromMap(table.options()).startupMode()
-                != CoreOptions.StartupMode.COMPACTED_FULL) {
+        CoreOptions options = CoreOptions.fromMap(table.options());
+        if (options.startupMode() != CoreOptions.StartupMode.COMPACTED_FULL) {
             table =
                     table.copy(
                             Collections.singletonMap(
                                     CoreOptions.SCAN_MODE.key(),
                                     CoreOptions.StartupMode.LATEST_FULL.toString()));
         }
+        this.sequenceFieldEnabled =
+                table.primaryKeys().size() > 0 && options.sequenceField().isPresent();
 
         this.readBuilder = table.newReadBuilder().withProjection(projection).withFilter(predicate);
         scan = readBuilder.newStreamScan();
@@ -110,21 +119,27 @@ public class TableStreamingReader {
 
     private RecordReader<InternalRow> read(TableScan.Plan plan, boolean useParallelism)
             throws IOException {
+        CoreOptions options = CoreOptions.fromMap(table.options());
+        FunctionWithException<Split, RecordReader<InternalRow>, IOException> readerSupplier =
+                sequenceFieldEnabled ? createKvReaderSupplier() : createReaderSupplier();
+
+        RowType rowType = TypeUtils.project(table.rowType(), projection);
+        // append sequence number at the last column.
+        rowType = PrimaryKeyLookupTable.appendSequenceNumber(rowType);
+
         RecordReader<InternalRow> reader;
         if (useParallelism) {
-            CoreOptions options = CoreOptions.fromMap(table.options());
             reader =
                     SplitsParallelReadUtil.parallelExecute(
-                            TypeUtils.project(table.rowType(), projection),
-                            readBuilder,
+                            rowType,
+                            readerSupplier,
                             plan.splits(),
                             options.pageSize(),
                             new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
         } else {
-            TableRead read = readBuilder.newRead();
             List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new ArrayList<>();
             for (Split split : plan.splits()) {
-                readers.add(() -> read.createReader(split));
+                readers.add(() -> readerSupplier.apply(split));
             }
             reader = ConcatRecordReader.create(readers);
         }
@@ -134,4 +149,35 @@ public class TableStreamingReader {
         }
         return reader;
     }
+
+    private FunctionWithException<Split, RecordReader<InternalRow>, IOException>
+            createKvReaderSupplier() {
+        KeyValueTableRead read = (KeyValueTableRead) readBuilder.newRead();
+        return split -> {
+            JoinedRow reused = new JoinedRow();
+            return read.kvReader(split)
+                    .transform(
+                            kv -> {
+                                reused.replace(kv.value(), GenericRow.of(kv.sequenceNumber()));
+                                reused.setRowKind(kv.valueKind());
+                                return reused;
+                            });
+        };
+    }
+
+    private FunctionWithException<Split, RecordReader<InternalRow>, IOException>
+            createReaderSupplier() {
+        TableRead read = readBuilder.newRead();
+
+        return split -> {
+            JoinedRow reused = new JoinedRow();
+            return read.createReader(split)
+                    .transform(
+                            row -> {
+                                reused.replace(row, GenericRow.of(-1L));
+                                reused.setRowKind(row.getRowKind());
+                                return reused;
+                            });
+        };
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 70db991e9..bd19605eb 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -601,4 +601,37 @@ public class LookupJoinITCase extends CatalogITCaseBase {
                         Row.of(4, null, null, null));
         iterator.close();
     }
+
+    @Test
+    public void testWithSequenceFieldTable() throws Exception {
+        sql(
+                "CREATE TABLE DIM_2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH"
+                        + " ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')");
+        sql("INSERT INTO DIM_2 VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_2 for system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, null, null, null));
+
+        sql("INSERT INTO DIM_2 VALUES (2, 11, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222), // not change
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(4, null, null, null));
+
+        iterator.close();
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 3ba4432f4..219e41173 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.flink.lookup.LookupTable.TableBulkLoader;
 import org.apache.paimon.lookup.BulkLoader;
 import org.apache.paimon.lookup.RocksDBStateFactory;
@@ -82,7 +83,8 @@ public class LookupTableTest {
                         singletonList("f0"),
                         singletonList("f0"),
                         r -> true,
-                        ThreadLocalRandom.current().nextInt(2) * 10);
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        false);
 
         // test bulk load error
         {
@@ -96,7 +98,7 @@ public class LookupTableTest {
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         for (int i = 1; i <= 100_000; i++) {
             InternalRow row = row(i, 11 * i, 111 * i);
-            records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row)));
+            records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(sequence(row, -1L))));
         }
         records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey()));
         TableBulkLoader bulkLoader = table.createBulkLoader();
@@ -112,19 +114,61 @@ public class LookupTableTest {
         }
 
         // test refresh to update
-        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
         // test refresh to delete
-        table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator());
+        table.refresh(singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator());
         assertThat(table.get(row(1))).hasSize(0);
 
-        table.refresh(singletonList(row(RowKind.DELETE, 3, 33, 333)).iterator());
+        table.refresh(singletonList(sequence(row(RowKind.DELETE, 3, 33, 333), -1L)).iterator());
         assertThat(table.get(row(3))).hasSize(0);
     }
 
+    @Test
+    public void testPkTableWithSequenceField() throws Exception {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f0"),
+                        r -> true,
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        true);
+
+        List<Pair<byte[], byte[]>> records = new ArrayList<>();
+        for (int i = 1; i <= 10; i++) {
+            InternalRow row = sequence(row(i, 11 * i, 111 * i), -1L);
+            records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row)));
+        }
+        records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey()));
+        TableBulkLoader bulkLoader = table.createBulkLoader();
+        for (Pair<byte[], byte[]> kv : records) {
+            bulkLoader.write(kv.getKey(), kv.getValue());
+        }
+        bulkLoader.finish();
+
+        // test refresh to update
+        table.refresh(singletonList(sequence(row(1, 22, 222), 1L)).iterator());
+        List<InternalRow> result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        // refresh with old sequence
+        table.refresh(singletonList((sequence(row(1, 33, 333), 0L))).iterator());
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        // test refresh delete data with old sequence
+        table.refresh(singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator());
+        assertThat(table.get(row(1))).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+    }
+
     @Test
     public void testPkTablePkFilter() throws IOException {
         LookupTable table =
@@ -134,22 +178,23 @@ public class LookupTableTest {
                         singletonList("f0"),
                         singletonList("f0"),
                         r -> r.getInt(0) < 3,
-                        ThreadLocalRandom.current().nextInt(2) * 10);
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        false);
 
-        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 11, 111);
 
-        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
         result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
-        table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator());
+        table.refresh(singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator());
         assertThat(table.get(row(1))).hasSize(0);
 
-        table.refresh(singletonList(row(3, 33, 333)).iterator());
+        table.refresh(singletonList(sequence(row(3, 33, 333), -1L)).iterator());
         assertThat(table.get(row(3))).hasSize(0);
     }
 
@@ -162,14 +207,15 @@ public class LookupTableTest {
                         singletonList("f0"),
                         singletonList("f0"),
                         r -> r.getInt(1) < 22,
-                        ThreadLocalRandom.current().nextInt(2) * 10);
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        false);
 
-        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 11, 111);
 
-        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
         result = table.get(row(1));
         assertThat(result).hasSize(0);
     }
@@ -183,7 +229,8 @@ public class LookupTableTest {
                         singletonList("f0"),
                         singletonList("f1"),
                         r -> true,
-                        ThreadLocalRandom.current().nextInt(2) * 10);
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        false);
 
         // test bulk load 100_000 records
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -192,6 +239,46 @@ public class LookupTableTest {
         for (int i = 1; i <= 100_000; i++) {
             int secKey = rnd.nextInt(i);
             InternalRow row = row(i, secKey, 111 * i);
+            records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(sequence(row, -1L))));
+            secKeyToPk.computeIfAbsent(secKey, k -> new HashSet<>()).add(i);
+        }
+        records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey()));
+        TableBulkLoader bulkLoader = table.createBulkLoader();
+        for (Pair<byte[], byte[]> kv : records) {
+            bulkLoader.write(kv.getKey(), kv.getValue());
+        }
+        bulkLoader.finish();
+
+        for (Map.Entry<Integer, Set<Integer>> entry : secKeyToPk.entrySet()) {
+            List<InternalRow> result = table.get(row(entry.getKey()));
+            assertThat(result.stream().map(row -> row.getInt(0)))
+                    .containsExactlyInAnyOrderElementsOf(entry.getValue());
+        }
+
+        // add new sec key to pk
+        table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
+        List<InternalRow> result = table.get(row(22));
+        assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
+    }
+
+    @Test
+    public void testSecKeyTableWithSequenceField() throws Exception {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f1"),
+                        r -> true,
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        true);
+
+        List<Pair<byte[], byte[]>> records = new ArrayList<>();
+        Random rnd = new Random();
+        Map<Integer, Set<Integer>> secKeyToPk = new HashMap<>();
+        for (int i = 1; i <= 10; i++) {
+            int secKey = rnd.nextInt(i);
+            InternalRow row = new JoinedRow(row(i, secKey, 111 * i), GenericRow.of(-1L));
             records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row)));
             secKeyToPk.computeIfAbsent(secKey, k -> new HashSet<>()).add(i);
         }
@@ -208,10 +295,20 @@ public class LookupTableTest {
                     .containsExactlyInAnyOrderElementsOf(entry.getValue());
         }
 
+        JoinedRow joined = new JoinedRow();
         // add new sec key to pk
-        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        table.refresh(
+                singletonList((InternalRow) joined.replace(row(1, 22, 222), GenericRow.of(1L)))
+                        .iterator());
         List<InternalRow> result = table.get(row(22));
         assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
+
+        // refresh with old value
+        table.refresh(
+                singletonList((InternalRow) joined.replace(row(1, 22, 333), GenericRow.of(0L)))
+                        .iterator());
+        result = table.get(row(22));
+        assertThat(result.stream().map(row -> row.getInt(2))).doesNotContain(333);
     }
 
     @Test
@@ -223,31 +320,32 @@ public class LookupTableTest {
                         singletonList("f0"),
                         singletonList("f1"),
                         r -> r.getInt(0) < 3,
-                        ThreadLocalRandom.current().nextInt(2) * 10);
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        false);
 
-        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator());
         List<InternalRow> result = table.get(row(11));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 11, 111);
 
-        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
         assertThat(table.get(row(11))).hasSize(0);
         result = table.get(row(22));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
-        table.refresh(singletonList(row(2, 22, 222)).iterator());
+        table.refresh(singletonList(sequence(row(2, 22, 222), -1L)).iterator());
         result = table.get(row(22));
         assertThat(result).hasSize(2);
         assertRow(result.get(0), 1, 22, 222);
         assertRow(result.get(1), 2, 22, 222);
 
-        table.refresh(singletonList(row(RowKind.DELETE, 2, 22, 222)).iterator());
+        table.refresh(singletonList(sequence(row(RowKind.DELETE, 2, 22, 222), -1L)).iterator());
         result = table.get(row(22));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
-        table.refresh(singletonList(row(3, 33, 333)).iterator());
+        table.refresh(singletonList(sequence(row(3, 33, 333), -1L)).iterator());
         assertThat(table.get(row(33))).hasSize(0);
     }
 
@@ -260,7 +358,8 @@ public class LookupTableTest {
                         Collections.emptyList(),
                         singletonList("f1"),
                         r -> true,
-                        ThreadLocalRandom.current().nextInt(2) * 10);
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        false);
 
         // test bulk load 100_000 records
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -300,7 +399,8 @@ public class LookupTableTest {
                         Collections.emptyList(),
                         singletonList("f1"),
                         r -> r.getInt(2) < 222,
-                        ThreadLocalRandom.current().nextInt(2) * 10);
+                        ThreadLocalRandom.current().nextInt(2) * 10,
+                        false);
 
         table.refresh(singletonList(row(1, 11, 333)).iterator());
         List<InternalRow> result = table.get(row(11));
@@ -332,6 +432,10 @@ public class LookupTableTest {
         return row;
     }
 
+    private static InternalRow sequence(InternalRow row, long sequenceNumber) {
+        return new JoinedRow(row.getRowKind(), row, GenericRow.of(sequenceNumber));
+    }
+
     private static void assertRow(InternalRow resultRow, int... expected) {
         int[] results = new int[expected.length];
         for (int i = 0; i < results.length; i++) {