You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/01/05 15:31:38 UTC
(incubator-paimon) 01/04: [flink] Fix inconsistency problem when lookup join table with sequence field
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit ddd717ebd3e846609653337d17ce41de8bd8b46f
Author: Aitozi <10...@qq.com>
AuthorDate: Wed Jan 3 09:58:46 2024 +0800
[flink] Fix inconsistency problem when lookup join table with sequence field
This closes #2622
---
.../paimon/crosspartition/IndexBootstrap.java | 2 +-
.../apache/paimon/io/SplitsParallelReadUtil.java | 9 +-
.../paimon/table/source/KeyValueTableRead.java | 4 +
.../flink/lookup/FileStoreLookupFunction.java | 8 +-
.../apache/paimon/flink/lookup/LookupTable.java | 18 ++-
.../paimon/flink/lookup/PrimaryKeyLookupTable.java | 30 ++++-
.../flink/lookup/SecondaryIndexLookupTable.java | 34 ++++-
.../paimon/flink/lookup/TableStreamingReader.java | 60 ++++++++-
.../org/apache/paimon/flink/LookupJoinITCase.java | 33 +++++
.../paimon/flink/lookup/LookupTableTest.java | 150 +++++++++++++++++----
10 files changed, 300 insertions(+), 48 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index 465955e71..260af9a9a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -110,7 +110,7 @@ public class IndexBootstrap implements Serializable {
return parallelExecute(
TypeUtils.project(rowType, keyProjection),
- readBuilder,
+ s -> readBuilder.newRead().createReader(s),
splits,
options.pageSize(),
options.crossPartitionUpsertBootstrapParallelism(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
index 164d0f2f7..775945735 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
@@ -21,9 +21,9 @@ package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallelExecution;
import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
@@ -45,7 +45,7 @@ public class SplitsParallelReadUtil {
public static RecordReader<InternalRow> parallelExecute(
RowType projectedType,
- ReadBuilder readBuilder,
+ FunctionWithException<Split, RecordReader<InternalRow>, IOException> readBuilder,
List<Split> splits,
int pageSize,
int parallelism) {
@@ -61,7 +61,7 @@ public class SplitsParallelReadUtil {
public static <EXTRA> RecordReader<InternalRow> parallelExecute(
RowType projectedType,
- ReadBuilder readBuilder,
+ FunctionWithException<Split, RecordReader<InternalRow>, IOException> readBuilder,
List<Split> splits,
int pageSize,
int parallelism,
@@ -72,8 +72,7 @@ public class SplitsParallelReadUtil {
suppliers.add(
() -> {
try {
- RecordReader<InternalRow> reader =
- readBuilder.newRead().createReader(split);
+ RecordReader<InternalRow> reader = readBuilder.apply(split);
return Pair.of(reader, extraFunction.apply(split));
} catch (IOException e) {
throw new UncheckedIOException(e);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index aaebc1532..17735b813 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -53,6 +53,10 @@ public abstract class KeyValueTableRead extends AbstractDataTableRead<KeyValue>
return new RowDataRecordReader(read.createReader((DataSplit) split));
}
+ public final RecordReader<KeyValue> kvReader(Split split) throws IOException {
+ return read.createReader((DataSplit) split);
+ }
+
protected abstract RecordReader.RecordIterator<InternalRow> rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 56c866993..2359204ef 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -94,6 +94,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private transient long nextLoadTime;
private transient TableStreamingReader streamingReader;
+ private final boolean sequenceFieldEnabled;
+
public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
TableScanUtils.streamingReadingValidate(table);
@@ -119,6 +121,9 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
}
this.predicate = predicate;
+ this.sequenceFieldEnabled =
+ table.primaryKeys().size() > 0
+ && new CoreOptions(table.options()).sequenceField().isPresent();
}
public void open(FunctionContext context) throws Exception {
@@ -151,7 +156,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
table.primaryKeys(),
joinKeys,
recordFilter,
- options.get(LOOKUP_CACHE_ROWS));
+ options.get(LOOKUP_CACHE_ROWS),
+ sequenceFieldEnabled);
this.nextLoadTime = -1;
this.streamingReader = new TableStreamingReader(table, projection, this.predicate);
bulkLoad(options);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index 00cc6e3f2..6dae92efd 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -50,7 +50,8 @@ public interface LookupTable {
List<String> primaryKey,
List<String> joinKey,
Predicate<InternalRow> recordFilter,
- long lruCacheSize)
+ long lruCacheSize,
+ boolean sequenceFieldEnabled)
throws IOException {
if (primaryKey.isEmpty()) {
return new NoPrimaryKeyLookupTable(
@@ -58,10 +59,21 @@ public interface LookupTable {
} else {
if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
return new PrimaryKeyLookupTable(
- stateFactory, rowType, joinKey, recordFilter, lruCacheSize);
+ stateFactory,
+ rowType,
+ joinKey,
+ recordFilter,
+ lruCacheSize,
+ sequenceFieldEnabled);
} else {
return new SecondaryIndexLookupTable(
- stateFactory, rowType, primaryKey, joinKey, recordFilter, lruCacheSize);
+ stateFactory,
+ rowType,
+ primaryKey,
+ joinKey,
+ recordFilter,
+ lruCacheSize,
+ sequenceFieldEnabled);
}
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 7234ed8e9..dfeb29684 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -23,6 +23,8 @@ import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.lookup.RocksDBValueState;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
@@ -44,26 +46,41 @@ public class PrimaryKeyLookupTable implements LookupTable {
protected int[] primaryKeyMapping;
protected final KeyProjectedRow primaryKey;
+ protected final boolean sequenceFieldEnabled;
+
+ protected final int sequenceIndex;
public PrimaryKeyLookupTable(
RocksDBStateFactory stateFactory,
RowType rowType,
List<String> primaryKey,
Predicate<InternalRow> recordFilter,
- long lruCacheSize)
+ long lruCacheSize,
+ boolean sequenceFieldEnabled)
throws IOException {
List<String> fieldNames = rowType.getFieldNames();
this.primaryKeyMapping = primaryKey.stream().mapToInt(fieldNames::indexOf).toArray();
this.primaryKey = new KeyProjectedRow(primaryKeyMapping);
+ this.sequenceFieldEnabled = sequenceFieldEnabled;
+ // append sequence number at last column when sequence field is attached.
+ RowType adjustedRowType = appendSequenceNumber(rowType);
+ this.sequenceIndex = adjustedRowType.getFieldCount() - 1;
+
this.tableState =
stateFactory.valueState(
"table",
InternalSerializers.create(TypeUtils.project(rowType, primaryKeyMapping)),
- InternalSerializers.create(rowType),
+ InternalSerializers.create(adjustedRowType),
lruCacheSize);
this.recordFilter = recordFilter;
}
+ public static RowType appendSequenceNumber(RowType rowType) {
+ List<DataType> types = rowType.getFieldTypes();
+ types.add(DataTypes.BIGINT());
+ return RowType.of(types.toArray(new DataType[0]));
+ }
+
@Override
public List<InternalRow> get(InternalRow key) throws IOException {
InternalRow value = tableState.get(key);
@@ -75,6 +92,15 @@ public class PrimaryKeyLookupTable implements LookupTable {
while (incremental.hasNext()) {
InternalRow row = incremental.next();
primaryKey.replaceRow(row);
+ if (sequenceFieldEnabled) {
+ InternalRow previous = tableState.get(primaryKey);
+ // only update the kv when the new value's sequence number is higher.
+ if (previous != null
+ && previous.getLong(sequenceIndex) > row.getLong(sequenceIndex)) {
+ continue;
+ }
+ }
+
if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
if (recordFilter.test(row)) {
tableState.put(primaryKey, row);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index a813df0dc..cb99286e8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -46,9 +46,16 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
List<String> primaryKey,
List<String> secKey,
Predicate<InternalRow> recordFilter,
- long lruCacheSize)
+ long lruCacheSize,
+ boolean sequenceFieldEnabled)
throws IOException {
- super(stateFactory, rowType, primaryKey, recordFilter, lruCacheSize / 2);
+ super(
+ stateFactory,
+ rowType,
+ primaryKey,
+ recordFilter,
+ lruCacheSize / 2,
+ sequenceFieldEnabled);
List<String> fieldNames = rowType.getFieldNames();
int[] secKeyMapping = secKey.stream().mapToInt(fieldNames::indexOf).toArray();
this.secKeyRow = new KeyProjectedRow(secKeyMapping);
@@ -65,9 +72,9 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
List<InternalRow> pks = indexState.get(key);
List<InternalRow> values = new ArrayList<>(pks.size());
for (InternalRow pk : pks) {
- InternalRow value = tableState.get(pk);
- if (value != null) {
- values.add(value);
+ InternalRow row = tableState.get(pk);
+ if (row != null) {
+ values.add(row);
}
}
return values;
@@ -78,8 +85,23 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
while (incremental.hasNext()) {
InternalRow row = incremental.next();
primaryKey.replaceRow(row);
+
+ boolean previousFetched = false;
+ InternalRow previous = null;
+ if (sequenceFieldEnabled) {
+ previous = tableState.get(primaryKey);
+ previousFetched = true;
+ // only update the kv when the new value's sequence number is higher.
+ if (previous != null
+ && previous.getLong(sequenceIndex) > row.getLong(sequenceIndex)) {
+ continue;
+ }
+ }
+
if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
- InternalRow previous = tableState.get(primaryKey);
+ if (!previousFetched) {
+ previous = tableState.get(primaryKey);
+ }
if (previous != null) {
indexState.retract(secKeyRow.replaceRow(previous), primaryKey);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
index c737458d7..d1befa6a6 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
@@ -19,7 +19,9 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.Options;
@@ -28,11 +30,14 @@ import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -58,17 +63,21 @@ public class TableStreamingReader {
@Nullable private final PredicateFilter recordFilter;
private final StreamTableScan scan;
+ private final boolean sequenceFieldEnabled;
+
public TableStreamingReader(Table table, int[] projection, @Nullable Predicate predicate) {
this.table = table;
this.projection = projection;
- if (CoreOptions.fromMap(table.options()).startupMode()
- != CoreOptions.StartupMode.COMPACTED_FULL) {
+ CoreOptions options = CoreOptions.fromMap(table.options());
+ if (options.startupMode() != CoreOptions.StartupMode.COMPACTED_FULL) {
table =
table.copy(
Collections.singletonMap(
CoreOptions.SCAN_MODE.key(),
CoreOptions.StartupMode.LATEST_FULL.toString()));
}
+ this.sequenceFieldEnabled =
+ table.primaryKeys().size() > 0 && options.sequenceField().isPresent();
this.readBuilder = table.newReadBuilder().withProjection(projection).withFilter(predicate);
scan = readBuilder.newStreamScan();
@@ -110,21 +119,27 @@ public class TableStreamingReader {
private RecordReader<InternalRow> read(TableScan.Plan plan, boolean useParallelism)
throws IOException {
+ CoreOptions options = CoreOptions.fromMap(table.options());
+ FunctionWithException<Split, RecordReader<InternalRow>, IOException> readerSupplier =
+ sequenceFieldEnabled ? createKvReaderSupplier() : createReaderSupplier();
+
+ RowType rowType = TypeUtils.project(table.rowType(), projection);
+ // append sequence number at the last column.
+ rowType = PrimaryKeyLookupTable.appendSequenceNumber(rowType);
+
RecordReader<InternalRow> reader;
if (useParallelism) {
- CoreOptions options = CoreOptions.fromMap(table.options());
reader =
SplitsParallelReadUtil.parallelExecute(
- TypeUtils.project(table.rowType(), projection),
- readBuilder,
+ rowType,
+ readerSupplier,
plan.splits(),
options.pageSize(),
new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
} else {
- TableRead read = readBuilder.newRead();
List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : plan.splits()) {
- readers.add(() -> read.createReader(split));
+ readers.add(() -> readerSupplier.apply(split));
}
reader = ConcatRecordReader.create(readers);
}
@@ -134,4 +149,35 @@ public class TableStreamingReader {
}
return reader;
}
+
+ private FunctionWithException<Split, RecordReader<InternalRow>, IOException>
+ createKvReaderSupplier() {
+ KeyValueTableRead read = (KeyValueTableRead) readBuilder.newRead();
+ return split -> {
+ JoinedRow reused = new JoinedRow();
+ return read.kvReader(split)
+ .transform(
+ kv -> {
+ reused.replace(kv.value(), GenericRow.of(kv.sequenceNumber()));
+ reused.setRowKind(kv.valueKind());
+ return reused;
+ });
+ };
+ }
+
+ private FunctionWithException<Split, RecordReader<InternalRow>, IOException>
+ createReaderSupplier() {
+ TableRead read = readBuilder.newRead();
+
+ return split -> {
+ JoinedRow reused = new JoinedRow();
+ return read.createReader(split)
+ .transform(
+ row -> {
+ reused.replace(row, GenericRow.of(-1L));
+ reused.setRowKind(row.getRowKind());
+ return reused;
+ });
+ };
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 70db991e9..bd19605eb 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -601,4 +601,37 @@ public class LookupJoinITCase extends CatalogITCaseBase {
Row.of(4, null, null, null));
iterator.close();
}
+
+ @Test
+ public void testWithSequenceFieldTable() throws Exception {
+ sql(
+ "CREATE TABLE DIM_2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH"
+ + " ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')");
+ sql("INSERT INTO DIM_2 VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_2 for system_time as of T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ List<Row> result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222),
+ Row.of(3, null, null, null));
+
+ sql("INSERT INTO DIM_2 VALUES (2, 11, 444, 4444), (3, 33, 333, 3333)");
+ Thread.sleep(2000); // wait refresh
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222), // not change
+ Row.of(3, 33, 333, 3333),
+ Row.of(4, null, null, null));
+
+ iterator.close();
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 3ba4432f4..219e41173 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.lookup.LookupTable.TableBulkLoader;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBStateFactory;
@@ -82,7 +83,8 @@ public class LookupTableTest {
singletonList("f0"),
singletonList("f0"),
r -> true,
- ThreadLocalRandom.current().nextInt(2) * 10);
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ false);
// test bulk load error
{
@@ -96,7 +98,7 @@ public class LookupTableTest {
List<Pair<byte[], byte[]>> records = new ArrayList<>();
for (int i = 1; i <= 100_000; i++) {
InternalRow row = row(i, 11 * i, 111 * i);
- records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row)));
+ records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(sequence(row, -1L))));
}
records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey()));
TableBulkLoader bulkLoader = table.createBulkLoader();
@@ -112,19 +114,61 @@ public class LookupTableTest {
}
// test refresh to update
- table.refresh(singletonList(row(1, 22, 222)).iterator());
+ table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
// test refresh to delete
- table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator());
+ table.refresh(singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator());
assertThat(table.get(row(1))).hasSize(0);
- table.refresh(singletonList(row(RowKind.DELETE, 3, 33, 333)).iterator());
+ table.refresh(singletonList(sequence(row(RowKind.DELETE, 3, 33, 333), -1L)).iterator());
assertThat(table.get(row(3))).hasSize(0);
}
+ @Test
+ public void testPkTableWithSequenceField() throws Exception {
+ LookupTable table =
+ LookupTable.create(
+ stateFactory,
+ rowType,
+ singletonList("f0"),
+ singletonList("f0"),
+ r -> true,
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ true);
+
+ List<Pair<byte[], byte[]>> records = new ArrayList<>();
+ for (int i = 1; i <= 10; i++) {
+ InternalRow row = sequence(row(i, 11 * i, 111 * i), -1L);
+ records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row)));
+ }
+ records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey()));
+ TableBulkLoader bulkLoader = table.createBulkLoader();
+ for (Pair<byte[], byte[]> kv : records) {
+ bulkLoader.write(kv.getKey(), kv.getValue());
+ }
+ bulkLoader.finish();
+
+ // test refresh to update
+ table.refresh(singletonList(sequence(row(1, 22, 222), 1L)).iterator());
+ List<InternalRow> result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 22, 222);
+
+ // refresh with old sequence
+ table.refresh(singletonList((sequence(row(1, 33, 333), 0L))).iterator());
+ result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 22, 222);
+
+ // test refresh delete data with old sequence
+ table.refresh(singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator());
+ assertThat(table.get(row(1))).hasSize(1);
+ assertRow(result.get(0), 1, 22, 222);
+ }
+
@Test
public void testPkTablePkFilter() throws IOException {
LookupTable table =
@@ -134,22 +178,23 @@ public class LookupTableTest {
singletonList("f0"),
singletonList("f0"),
r -> r.getInt(0) < 3,
- ThreadLocalRandom.current().nextInt(2) * 10);
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ false);
- table.refresh(singletonList(row(1, 11, 111)).iterator());
+ table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11, 111);
- table.refresh(singletonList(row(1, 22, 222)).iterator());
+ table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
- table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator());
+ table.refresh(singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator());
assertThat(table.get(row(1))).hasSize(0);
- table.refresh(singletonList(row(3, 33, 333)).iterator());
+ table.refresh(singletonList(sequence(row(3, 33, 333), -1L)).iterator());
assertThat(table.get(row(3))).hasSize(0);
}
@@ -162,14 +207,15 @@ public class LookupTableTest {
singletonList("f0"),
singletonList("f0"),
r -> r.getInt(1) < 22,
- ThreadLocalRandom.current().nextInt(2) * 10);
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ false);
- table.refresh(singletonList(row(1, 11, 111)).iterator());
+ table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11, 111);
- table.refresh(singletonList(row(1, 22, 222)).iterator());
+ table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
result = table.get(row(1));
assertThat(result).hasSize(0);
}
@@ -183,7 +229,8 @@ public class LookupTableTest {
singletonList("f0"),
singletonList("f1"),
r -> true,
- ThreadLocalRandom.current().nextInt(2) * 10);
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ false);
// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -192,6 +239,46 @@ public class LookupTableTest {
for (int i = 1; i <= 100_000; i++) {
int secKey = rnd.nextInt(i);
InternalRow row = row(i, secKey, 111 * i);
+ records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(sequence(row, -1L))));
+ secKeyToPk.computeIfAbsent(secKey, k -> new HashSet<>()).add(i);
+ }
+ records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey()));
+ TableBulkLoader bulkLoader = table.createBulkLoader();
+ for (Pair<byte[], byte[]> kv : records) {
+ bulkLoader.write(kv.getKey(), kv.getValue());
+ }
+ bulkLoader.finish();
+
+ for (Map.Entry<Integer, Set<Integer>> entry : secKeyToPk.entrySet()) {
+ List<InternalRow> result = table.get(row(entry.getKey()));
+ assertThat(result.stream().map(row -> row.getInt(0)))
+ .containsExactlyInAnyOrderElementsOf(entry.getValue());
+ }
+
+ // add new sec key to pk
+ table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
+ List<InternalRow> result = table.get(row(22));
+ assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
+ }
+
+ @Test
+ public void testSecKeyTableWithSequenceField() throws Exception {
+ LookupTable table =
+ LookupTable.create(
+ stateFactory,
+ rowType,
+ singletonList("f0"),
+ singletonList("f1"),
+ r -> true,
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ true);
+
+ List<Pair<byte[], byte[]>> records = new ArrayList<>();
+ Random rnd = new Random();
+ Map<Integer, Set<Integer>> secKeyToPk = new HashMap<>();
+ for (int i = 1; i <= 10; i++) {
+ int secKey = rnd.nextInt(i);
+ InternalRow row = new JoinedRow(row(i, secKey, 111 * i), GenericRow.of(-1L));
records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row)));
secKeyToPk.computeIfAbsent(secKey, k -> new HashSet<>()).add(i);
}
@@ -208,10 +295,20 @@ public class LookupTableTest {
.containsExactlyInAnyOrderElementsOf(entry.getValue());
}
+ JoinedRow joined = new JoinedRow();
// add new sec key to pk
- table.refresh(singletonList(row(1, 22, 222)).iterator());
+ table.refresh(
+ singletonList((InternalRow) joined.replace(row(1, 22, 222), GenericRow.of(1L)))
+ .iterator());
List<InternalRow> result = table.get(row(22));
assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
+
+ // refresh with old value
+ table.refresh(
+ singletonList((InternalRow) joined.replace(row(1, 22, 333), GenericRow.of(0L)))
+ .iterator());
+ result = table.get(row(22));
+ assertThat(result.stream().map(row -> row.getInt(2))).doesNotContain(333);
}
@Test
@@ -223,31 +320,32 @@ public class LookupTableTest {
singletonList("f0"),
singletonList("f1"),
r -> r.getInt(0) < 3,
- ThreadLocalRandom.current().nextInt(2) * 10);
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ false);
- table.refresh(singletonList(row(1, 11, 111)).iterator());
+ table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator());
List<InternalRow> result = table.get(row(11));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11, 111);
- table.refresh(singletonList(row(1, 22, 222)).iterator());
+ table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator());
assertThat(table.get(row(11))).hasSize(0);
result = table.get(row(22));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
- table.refresh(singletonList(row(2, 22, 222)).iterator());
+ table.refresh(singletonList(sequence(row(2, 22, 222), -1L)).iterator());
result = table.get(row(22));
assertThat(result).hasSize(2);
assertRow(result.get(0), 1, 22, 222);
assertRow(result.get(1), 2, 22, 222);
- table.refresh(singletonList(row(RowKind.DELETE, 2, 22, 222)).iterator());
+ table.refresh(singletonList(sequence(row(RowKind.DELETE, 2, 22, 222), -1L)).iterator());
result = table.get(row(22));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
- table.refresh(singletonList(row(3, 33, 333)).iterator());
+ table.refresh(singletonList(sequence(row(3, 33, 333), -1L)).iterator());
assertThat(table.get(row(33))).hasSize(0);
}
@@ -260,7 +358,8 @@ public class LookupTableTest {
Collections.emptyList(),
singletonList("f1"),
r -> true,
- ThreadLocalRandom.current().nextInt(2) * 10);
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ false);
// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -300,7 +399,8 @@ public class LookupTableTest {
Collections.emptyList(),
singletonList("f1"),
r -> r.getInt(2) < 222,
- ThreadLocalRandom.current().nextInt(2) * 10);
+ ThreadLocalRandom.current().nextInt(2) * 10,
+ false);
table.refresh(singletonList(row(1, 11, 333)).iterator());
List<InternalRow> result = table.get(row(11));
@@ -332,6 +432,10 @@ public class LookupTableTest {
return row;
}
+ private static InternalRow sequence(InternalRow row, long sequenceNumber) {
+ return new JoinedRow(row.getRowKind(), row, GenericRow.of(sequenceNumber));
+ }
+
private static void assertRow(InternalRow resultRow, int... expected) {
int[] results = new int[expected.length];
for (int i = 0; i < results.length; i++) {