You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/12/26 07:54:25 UTC
[ignite-3] branch main updated: IGNITE-17467 Return RowIds in MvPartitionStorage#scan() results (#1475)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 197a3f9677 IGNITE-17467 Return RowIds in MvPartitionStorage#scan() results (#1475)
197a3f9677 is described below
commit 197a3f9677fcfa4adc891c1c51fe780aa8a0c571
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Dec 26 11:54:20 2022 +0400
IGNITE-17467 Return RowIds in MvPartitionStorage#scan() results (#1475)
---
.../apache/ignite/internal/storage/ReadResult.java | 33 +++++++--
.../ignite/internal/storage/ReadResultTest.java | 13 +++-
.../storage/AbstractMvPartitionStorageTest.java | 75 +++++++++++--------
.../storage/BaseMvPartitionStorageTest.java | 1 +
.../internal/storage/BaseMvStoragesTest.java | 2 +-
.../storage/impl/TestMvPartitionStorage.java | 84 +++++++++++++---------
.../mv/AbstractPageMemoryMvPartitionStorage.java | 36 ++++++----
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 56 ++++++++-------
.../ItAbstractInternalTableScanTest.java | 3 +-
.../OutgoingSnapshotMvDataStreamingTest.java | 49 +++++++------
10 files changed, 215 insertions(+), 137 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 90f01bb21f..25e316b328 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -29,8 +29,8 @@ public class ReadResult {
/** Unset commit partition id value. */
public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
- /** Empty read result. */
- public static final ReadResult EMPTY = new ReadResult(null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+ /** ID of the row. */
+ private final RowId rowId;
/** Data. {@code null} iff the result is empty (i.e. no row exists or it is a tombstone). */
private final @Nullable BinaryRow binaryRow;
@@ -58,6 +58,7 @@ public class ReadResult {
private final @Nullable HybridTimestamp newestCommitTs;
private ReadResult(
+ RowId rowId,
@Nullable BinaryRow binaryRow,
@Nullable UUID transactionId,
@Nullable UUID commitTableId,
@@ -65,6 +66,7 @@ public class ReadResult {
@Nullable HybridTimestamp newestCommitTs,
int commitPartitionId
) {
+ this.rowId = rowId;
this.binaryRow = binaryRow;
// If transaction is not null, then commitTableId and commitPartitionId should be defined.
@@ -80,13 +82,32 @@ public class ReadResult {
this.commitPartitionId = commitPartitionId;
}
- public static ReadResult createFromWriteIntent(@Nullable BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
+ /**
+ * Returns an empty read result (that is, a result for a missing row ID or for a tombstone) for the given row ID.
+ *
+ * @param rowId ID of the row for which to create a ReadResult.
+ * @return An empty read result.
+ */
+ public static ReadResult empty(RowId rowId) {
+ return new ReadResult(rowId, null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+ }
+
+ public static ReadResult createFromWriteIntent(RowId rowId, @Nullable BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
int commitPartitionId, @Nullable HybridTimestamp lastCommittedTimestamp) {
- return new ReadResult(binaryRow, transactionId, commitTableId, null, lastCommittedTimestamp, commitPartitionId);
+ return new ReadResult(rowId, binaryRow, transactionId, commitTableId, null, lastCommittedTimestamp, commitPartitionId);
}
- public static ReadResult createFromCommitted(@Nullable BinaryRow binaryRow, HybridTimestamp commitTs) {
- return new ReadResult(binaryRow, null, null, commitTs, null, UNDEFINED_COMMIT_PARTITION_ID);
+ public static ReadResult createFromCommitted(RowId rowId, @Nullable BinaryRow binaryRow, HybridTimestamp commitTs) {
+ return new ReadResult(rowId, binaryRow, null, null, commitTs, null, UNDEFINED_COMMIT_PARTITION_ID);
+ }
+
+ /**
+ * Returns ID of the corresponding row.
+ *
+ * @return ID of the corresponding row.
+ */
+ public RowId rowId() {
+ return rowId;
}
/**
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
index 9310392568..d192300d27 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
@@ -19,17 +19,24 @@ package org.apache.ignite.internal.storage;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.junit.jupiter.api.Test;
class ReadResultTest {
@Test
- void resultInEmptyConstantIsEmpty() {
- assertTrue(ReadResult.EMPTY.isEmpty());
+ void resultProducecByEmptyMethodIsEmpty() {
+ ReadResult emptyResult = ReadResult.empty(anyRowId());
+
+ assertTrue(emptyResult.isEmpty());
+ }
+
+ private static RowId anyRowId() {
+ return new RowId(0);
}
@Test
void resultWithNullRowIsEmpty() {
- ReadResult result = ReadResult.createFromCommitted(null, null);
+ ReadResult result = ReadResult.createFromCommitted(anyRowId(), null, HybridTimestamp.MAX_VALUE);
assertTrue(result.isEmpty());
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index ac315cb10e..1c7bf67cdd 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -17,10 +17,11 @@
package org.apache.ignite.internal.storage;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@@ -40,7 +41,6 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -77,7 +77,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
@ParameterizedTest
@EnumSource
- public void testScanOverEmpty(ScanTimestampProvider tsProvider) throws Exception {
+ public void testScanOverEmpty(ScanTimestampProvider tsProvider) {
assertEquals(List.of(), convert(scan(tsProvider.scanTimestamp(clock))));
}
@@ -192,7 +192,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
* Tests basic invariants of {@link MvPartitionStorage#scan(HybridTimestamp)}.
*/
@Test
- public void testScan() throws Exception {
+ public void testScan() {
TestKey key1 = new TestKey(1, "1");
TestValue value1 = new TestValue(10, "xxx");
@@ -258,7 +258,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
assertThrows(NoSuchElementException.class, () -> cursor.next());
- assertThat(res, hasItems(value1, value2));
+ assertThat(res, containsInAnyOrder(value1, value2));
}
}
@@ -345,27 +345,29 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
return cursor.stream()
.map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow()))
.sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
- .collect(Collectors.toList());
+ .collect(toList());
}
}
@Test
- void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
+ void readOfUncommittedRowReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
- BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
+ ReadResult foundResult = storage.read(rowId, HybridTimestamp.MAX_VALUE);
- assertRowMatches(foundRow, binaryRow);
+ assertThat(foundResult.rowId(), is(rowId));
+ assertRowMatches(foundResult.binaryRow(), binaryRow);
}
@Test
- void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
+ void readOfCommittedRowReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
- BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
+ ReadResult foundResult = storage.read(rowId, HybridTimestamp.MAX_VALUE);
- assertRowMatches(foundRow, binaryRow);
+ assertThat(foundResult.rowId(), is(rowId));
+ assertRowMatches(foundResult.binaryRow(), binaryRow);
}
@Test
@@ -375,9 +377,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
RowId rowId2 = insert(binaryRow2, txId);
- BinaryRow foundRow = read(rowId2, HybridTimestamp.MAX_VALUE);
+ ReadResult foundResult = storage.read(rowId2, HybridTimestamp.MAX_VALUE);
- assertRowMatches(foundRow, binaryRow2);
+ assertThat(foundResult.rowId(), is(rowId2));
+ assertRowMatches(foundResult.binaryRow(), binaryRow2);
}
@Test
@@ -388,9 +391,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
RowId rowId2 = insert(binaryRow2, txId);
commitWrite(rowId2, clock.now());
- BinaryRow foundRow = read(rowId2, HybridTimestamp.MAX_VALUE);
+ ReadResult foundResult = storage.read(rowId2, HybridTimestamp.MAX_VALUE);
- assertRowMatches(foundRow, binaryRow2);
+ assertThat(foundResult.rowId(), is(rowId2));
+ assertRowMatches(foundResult.binaryRow(), binaryRow2);
}
@Test
@@ -685,10 +689,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
RowId rowId = commitAbortAndAddUncommitted();
- BinaryRow foundRow = storage.read(rowId, clock.now()).binaryRow();
+ ReadResult foundResult = storage.read(rowId, clock.now());
// We see the uncommitted row.
- assertRowMatches(foundRow, binaryRow3);
+ assertThat(foundResult.rowId(), is(rowId));
+ assertRowMatches(foundResult.binaryRow(), binaryRow3);
}
@Test
@@ -737,7 +742,6 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
RowId rowId = new RowId(PARTITION_ID);
storage.addWrite(rowId, binaryRow, txId, UUID.randomUUID(), 0);
-
commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow2, newTransactionId());
@@ -752,12 +756,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
void scanWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite(ScanTimestampProvider tsProvider) {
- commitAbortAndAddUncommitted();
+ RowId rowId = commitAbortAndAddUncommitted();
try (Cursor<ReadResult> cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
- BinaryRow foundRow = cursor.next().binaryRow();
+ ReadResult result = cursor.next();
- assertRowMatches(foundRow, binaryRow3);
+ assertThat(result.rowId(), is(rowId));
+ assertRowMatches(result.binaryRow(), binaryRow3);
assertFalse(cursor.hasNext());
}
@@ -998,7 +1003,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
// Reverse expected values to simplify comparison - they are returned in reversed order, newest to oldest.
Collections.reverse(values);
- List<IgniteBiTuple<TestKey, TestValue>> list = drainToList(storage.scanVersions(rowId));
+ List<IgniteBiTuple<TestKey, TestValue>> list = drainVerifyingRowIdMatch(storage.scanVersions(rowId), rowId);
assertEquals(values.size(), list.size());
@@ -1011,10 +1016,19 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
}
}
+ private static List<IgniteBiTuple<TestKey, TestValue>> drainVerifyingRowIdMatch(Cursor<ReadResult> cursor, RowId rowId) {
+ try (Cursor<ReadResult> c = cursor) {
+ return cursor.stream()
+ .peek(result -> assertThat(result.rowId(), is(rowId)))
+ .map(BaseMvStoragesTest::unwrap)
+ .collect(toList());
+ }
+ }
+
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
void testScanWithWriteIntent(ScanTimestampProvider tsProvider) {
- HybridTimestamp commitTs = addCommittedVersionAndWriteIntent();
+ IgniteBiTuple<RowId, HybridTimestamp> rowIdAndCommitTs = addCommittedVersionAndWriteIntent();
try (PartitionTimestampCursor cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
assertTrue(cursor.hasNext());
@@ -1023,15 +1037,16 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
assertTrue(next.isWriteIntent());
+ assertThat(next.rowId(), is(rowIdAndCommitTs.get1()));
assertRowMatches(next.binaryRow(), binaryRow2);
- BinaryRow committedRow = cursor.committed(commitTs);
+ BinaryRow committedRow = cursor.committed(rowIdAndCommitTs.get2());
assertRowMatches(committedRow, binaryRow);
}
}
- private HybridTimestamp addCommittedVersionAndWriteIntent() {
+ private IgniteBiTuple<RowId, HybridTimestamp> addCommittedVersionAndWriteIntent() {
RowId rowId = new RowId(PARTITION_ID);
HybridTimestamp commitTs = clock.now();
@@ -1046,7 +1061,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
return null;
});
- return commitTs;
+ return new IgniteBiTuple<>(rowId, commitTs);
}
@Test
@@ -1069,7 +1084,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
commitWrite(newRowId, clock.now());
});
- List<IgniteBiTuple<TestKey, TestValue>> list = drainToList(storage.scanVersions(rowId));
+ List<IgniteBiTuple<TestKey, TestValue>> list = drainVerifyingRowIdMatch(storage.scanVersions(rowId), rowId);
assertEquals(2, list.size());
@@ -1141,6 +1156,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
ReadResult result = cursor.next();
+ assertThat(result.rowId(), is(rowId));
assertTrue(result.isWriteIntent());
assertThat(result.commitPartitionId(), is(not(ReadResult.UNDEFINED_COMMIT_PARTITION_ID)));
assertThat(result.commitTableId(), is(notNullValue()));
@@ -1158,6 +1174,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
ReadResult result = cursor.next();
+ assertThat(result.rowId(), is(rowId));
assertFalse(result.isWriteIntent());
assertThat(result.commitPartitionId(), is(ReadResult.UNDEFINED_COMMIT_PARTITION_ID));
assertThat(result.commitTableId(), is(nullValue()));
@@ -1195,6 +1212,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
assertTrue(cursor.hasNext());
ReadResult next = cursor.next();
+
+ assertThat(next.rowId(), is(rowId));
assertNull(next.binaryRow());
assertEquals(commitTs, next.newestCommitTimestamp());
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index f59293ca1e..8bd06d80ce 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -97,6 +97,7 @@ public abstract class BaseMvPartitionStorageTest extends BaseMvStoragesTest {
/**
* Reads a row.
*/
+ @Nullable
protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
ReadResult readResult = storage.read(rowId, timestamp);
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 767cb1a017..7389cfd183 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -168,7 +168,7 @@ public abstract class BaseMvStoragesTest {
}
}
- protected final void assertRowMatches(BinaryRow rowUnderQuestion, BinaryRow expectedRow) {
+ protected final void assertRowMatches(@Nullable BinaryRow rowUnderQuestion, BinaryRow expectedRow) {
assertThat(rowUnderQuestion, is(notNullValue()));
assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index a1473b77e8..02e5251418 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
@@ -50,9 +49,9 @@ import org.jetbrains.annotations.Nullable;
public class TestMvPartitionStorage implements MvPartitionStorage {
private final ConcurrentNavigableMap<RowId, VersionChain> map = new ConcurrentSkipListMap<>();
- private final NavigableSet<IgniteBiTuple<VersionChain, RowId>> gcQueue = new ConcurrentSkipListSet<>(
- comparing((IgniteBiTuple<VersionChain, RowId> p) -> p.get1().ts)
- .thenComparing(IgniteBiTuple::get2)
+ private final NavigableSet<VersionChain> gcQueue = new ConcurrentSkipListSet<>(
+ comparing((VersionChain chain) -> chain.ts)
+ .thenComparing(chain -> chain.rowId)
);
private volatile long lastAppliedIndex;
@@ -71,6 +70,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
}
private static class VersionChain {
+ final RowId rowId;
final @Nullable BinaryRow row;
final @Nullable HybridTimestamp ts;
final @Nullable UUID txId;
@@ -78,8 +78,9 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
final int commitPartitionId;
volatile @Nullable VersionChain next;
- VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp ts, @Nullable UUID txId, @Nullable UUID commitTableId,
+ VersionChain(RowId rowId, @Nullable BinaryRow row, @Nullable HybridTimestamp ts, @Nullable UUID txId, @Nullable UUID commitTableId,
int commitPartitionId, @Nullable VersionChain next) {
+ this.rowId = rowId;
this.row = row;
this.ts = ts;
this.txId = txId;
@@ -88,13 +89,13 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
this.next = next;
}
- static VersionChain forWriteIntent(@Nullable BinaryRow row, @Nullable UUID txId, @Nullable UUID commitTableId,
+ static VersionChain forWriteIntent(RowId rowId, @Nullable BinaryRow row, @Nullable UUID txId, @Nullable UUID commitTableId,
int commitPartitionId, @Nullable VersionChain next) {
- return new VersionChain(row, null, txId, commitTableId, commitPartitionId, next);
+ return new VersionChain(rowId, row, null, txId, commitTableId, commitPartitionId, next);
}
- static VersionChain forCommitted(@Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
- return new VersionChain(uncommittedVersionChain.row, timestamp, null, null,
+ static VersionChain forCommitted(RowId rowId, @Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
+ return new VersionChain(rowId, uncommittedVersionChain.row, timestamp, null, null,
ReadResult.UNDEFINED_COMMIT_PARTITION_ID, uncommittedVersionChain.next);
}
@@ -179,10 +180,10 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
res[0] = versionChain.row;
- return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain.next);
+ return VersionChain.forWriteIntent(rowId, row, txId, commitTableId, commitPartitionId, versionChain.next);
}
- return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain);
+ return VersionChain.forWriteIntent(rowId, row, txId, commitTableId, commitPartitionId, versionChain);
});
return res[0];
@@ -220,7 +221,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
return versionChain;
}
- return resolveCommittedVersionChain(rowId, VersionChain.forCommitted(timestamp, versionChain));
+ return resolveCommittedVersionChain(VersionChain.forCommitted(rowId, timestamp, versionChain));
});
}
@@ -237,7 +238,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
throw new StorageException("Write intent exists for " + rowId);
}
- return resolveCommittedVersionChain(rowId, new VersionChain(
+ return resolveCommittedVersionChain(new VersionChain(
+ rowId,
row,
commitTimestamp,
null,
@@ -248,17 +250,18 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
});
}
- @Nullable
- private VersionChain resolveCommittedVersionChain(RowId rowId, VersionChain committedVersionChain) {
- if (committedVersionChain.next != null) {
+ private VersionChain resolveCommittedVersionChain(VersionChain committedVersionChain) {
+ VersionChain nextChain = committedVersionChain.next;
+
+ if (nextChain != null) {
// Avoid creating tombstones for tombstones.
- if (committedVersionChain.row == null && committedVersionChain.next.row == null) {
- return committedVersionChain.next;
+ if (committedVersionChain.row == null && nextChain.row == null) {
+ return nextChain;
}
// Calling it from the compute is fine. Concurrent writes of the same row are impossible, and if we call the compute closure
// several times, the same tuple will be inserted into the GC queue (timestamp and rowId don't change in this case).
- gcQueue.add(new IgniteBiTuple<>(committedVersionChain, rowId));
+ gcQueue.add(committedVersionChain);
}
return committedVersionChain;
@@ -275,6 +278,10 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
VersionChain versionChain = map.get(rowId);
+ if (versionChain == null) {
+ return ReadResult.empty(rowId);
+ }
+
return read(versionChain, timestamp, null);
}
@@ -293,10 +300,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
) {
assert timestamp == null ^ txId == null;
- if (versionChain == null) {
- return ReadResult.EMPTY;
- }
-
if (timestamp == null) {
// Search by transaction id.
@@ -315,7 +318,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
// We *only* have a write-intent, return it.
BinaryRow binaryRow = cur.row;
- return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null);
+ return ReadResult.createFromWriteIntent(cur.rowId, binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null);
}
// Move to first commit.
@@ -327,15 +330,18 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
private static ReadResult versionChainToReadResult(VersionChain versionChain, boolean fillLastCommittedTs) {
if (versionChain.isWriteIntent()) {
+ VersionChain next = versionChain.next;
+
return ReadResult.createFromWriteIntent(
+ versionChain.rowId,
versionChain.row,
versionChain.txId,
versionChain.commitTableId,
- versionChain.commitPartitionId, fillLastCommittedTs && versionChain.next != null ? versionChain.next.ts : null
+ versionChain.commitPartitionId, fillLastCommittedTs && next != null ? next.ts : null
);
}
- return ReadResult.createFromCommitted(versionChain.row, versionChain.ts);
+ return ReadResult.createFromCommitted(versionChain.rowId, versionChain.row, versionChain.ts);
}
/**
@@ -354,8 +360,14 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
// So we just return write-intent.
BinaryRow binaryRow = chainHead.row;
- return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, chainHead.commitPartitionId,
- firstCommit.ts);
+ return ReadResult.createFromWriteIntent(
+ chainHead.rowId,
+ binaryRow,
+ chainHead.txId,
+ chainHead.commitTableId,
+ chainHead.commitPartitionId,
+ firstCommit.ts
+ );
}
VersionChain cur = firstCommit;
@@ -367,13 +379,13 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
// This commit has timestamp matching the query ts, meaning that commit is the one we are looking for.
BinaryRow binaryRow = cur.row;
- return ReadResult.createFromCommitted(binaryRow, cur.ts);
+ return ReadResult.createFromCommitted(cur.rowId, binaryRow, cur.ts);
}
cur = cur.next;
}
- return ReadResult.EMPTY;
+ return ReadResult.empty(chainHead.rowId);
}
@Override
@@ -455,6 +467,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
ReadResult res = currentReadResult;
+ assert res != null;
+
currentReadResult = null;
return res;
@@ -471,22 +485,22 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
@Override
public synchronized @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
- Iterator<IgniteBiTuple<VersionChain, RowId>> it = gcQueue.iterator();
+ Iterator<VersionChain> it = gcQueue.iterator();
if (!it.hasNext()) {
return null;
}
- IgniteBiTuple<VersionChain, RowId> next = it.next();
- VersionChain dequeuedVersionChain = next.get1();
+ VersionChain dequeuedVersionChain = it.next();
if (dequeuedVersionChain.ts.compareTo(lowWatermark) > 0) {
return null;
}
- RowId rowId = next.get2();
+ RowId rowId = dequeuedVersionChain.rowId;
VersionChain versionChainToRemove = dequeuedVersionChain.next;
+ assert versionChainToRemove != null;
assert versionChainToRemove.next == null;
dequeuedVersionChain.next = null;
@@ -503,7 +517,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
if (cur.next == dequeuedVersionChain) {
cur.next = null;
- gcQueue.remove(new IgniteBiTuple<>(cur, rowId));
+ gcQueue.remove(cur);
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 86a2e278e1..cd24c83611 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -322,7 +322,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
VersionChain versionChain = findVersionChain(rowId);
if (versionChain == null) {
- return ReadResult.EMPTY;
+ return ReadResult.empty(rowId);
}
if (lookingForLatestVersion(timestamp)) {
@@ -364,7 +364,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
} else {
ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
- return ReadResult.createFromCommitted(row, rowVersion.timestamp());
+ return ReadResult.createFromCommitted(versionChain.rowId(), row, rowVersion.timestamp());
}
}
@@ -424,22 +424,22 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
/**
* Walks version chain to find a row by timestamp. See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details.
*
- * @param chainHead Version chain head.
+ * @param chain Version chain head.
* @param timestamp Timestamp.
* @return Read result.
*/
- private ReadResult walkVersionChain(VersionChain chainHead, HybridTimestamp timestamp) {
- assert chainHead.hasCommittedVersions();
+ private ReadResult walkVersionChain(VersionChain chain, HybridTimestamp timestamp) {
+ assert chain.hasCommittedVersions();
- boolean hasWriteIntent = chainHead.isUncommitted();
+ boolean hasWriteIntent = chain.isUncommitted();
RowVersion firstCommit;
if (hasWriteIntent) {
// First commit can only match if its timestamp matches query timestamp.
- firstCommit = readRowVersion(chainHead.nextLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) == 0);
+ firstCommit = readRowVersion(chain.nextLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) == 0);
} else {
- firstCommit = readRowVersion(chainHead.headLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) >= 0);
+ firstCommit = readRowVersion(chain.headLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) >= 0);
}
assert firstCommit.isCommitted();
@@ -448,9 +448,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
if (hasWriteIntent && timestamp.compareTo(firstCommit.timestamp()) > 0) {
// It's the latest commit in chain, query ts is greater than commit ts and there is a write-intent.
// So we just return write-intent.
- RowVersion rowVersion = readRowVersion(chainHead.headLink(), ALWAYS_LOAD_VALUE);
+ RowVersion rowVersion = readRowVersion(chain.headLink(), ALWAYS_LOAD_VALUE);
- return writeIntentToResult(chainHead, rowVersion, firstCommit.timestamp());
+ return writeIntentToResult(chain, rowVersion, firstCommit.timestamp());
}
RowVersion curCommit = firstCommit;
@@ -470,7 +470,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
row = new ByteBufferRow(curCommit.value());
}
- return ReadResult.createFromCommitted(row, curCommit.timestamp());
+ return ReadResult.createFromCommitted(chain.rowId(), row, curCommit.timestamp());
}
if (!curCommit.hasNextLink()) {
@@ -480,7 +480,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
} while (curCommit != null);
- return ReadResult.EMPTY;
+ return ReadResult.empty(chain.rowId());
}
private ReadResult writeIntentToResult(VersionChain chain, RowVersion rowVersion, @Nullable HybridTimestamp lastCommittedTimestamp) {
@@ -492,7 +492,14 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
BinaryRow row = rowVersionToBinaryRow(rowVersion);
- return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, lastCommittedTimestamp);
+ return ReadResult.createFromWriteIntent(
+ chain.rowId(),
+ row,
+ transactionId,
+ commitTableId,
+ commitPartitionId,
+ lastCommittedTimestamp
+ );
}
private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
@@ -760,9 +767,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
ByteBufferRow row = new ByteBufferRow(rowVersion.value());
if (rowVersion.isCommitted()) {
- return ReadResult.createFromCommitted(row, rowVersion.timestamp());
+ return ReadResult.createFromCommitted(versionChain.rowId(), row, rowVersion.timestamp());
} else {
return ReadResult.createFromWriteIntent(
+ versionChain.rowId(),
row,
versionChain.transactionId(),
versionChain.commitTableId(),
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 3dd513a86a..a609aa3719 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -596,7 +596,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (invalid(seekIterator)) {
// No data at all.
- return ReadResult.EMPTY;
+ return ReadResult.empty(rowId);
}
ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
@@ -605,7 +605,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (!matches(rowId, readKeyBuf)) {
// It is already a different row, so no version exists for our rowId.
- return ReadResult.EMPTY;
+ return ReadResult.empty(rowId);
}
boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
@@ -615,15 +615,17 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, valueBytes);
}
- private static ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) {
+ private ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) {
assert valueBytes != null;
+ RowId rowId = getRowId(keyBuf);
+
if (!isWriteIntent) {
// There is no write-intent, return latest committed row.
- return wrapCommittedValue(valueBytes, readTimestamp(keyBuf));
+ return wrapCommittedValue(rowId, valueBytes, readTimestamp(keyBuf));
}
- return wrapUncommittedValue(valueBytes, null);
+ return wrapUncommittedValue(rowId, valueBytes, null);
}
/**
@@ -657,7 +659,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* @param keyBuf Buffer with a key in it: partition id + row id + timestamp.
* @return Read result.
*/
- private static ReadResult handleReadByTimestampIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp,
+ private ReadResult handleReadByTimestampIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp,
ByteBuffer keyBuf) {
// There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
// To avoid returning its value, we have to check that actual key matches what we need.
@@ -678,7 +680,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (invalid(seekIterator)) {
// There are no writes with row id.
- return ReadResult.EMPTY;
+ return ReadResult.empty(rowId);
}
foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -686,7 +688,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (!matches(rowId, foundKeyBuf)) {
// There are no writes with row id.
- return ReadResult.EMPTY;
+ return ReadResult.empty(rowId);
}
byte[] valueBytes = seekIterator.value();
@@ -699,7 +701,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (invalid(seekIterator)) {
// There are no committed writes, we can safely return write-intent.
- return wrapUncommittedValue(valueBytes, null);
+ return wrapUncommittedValue(rowId, valueBytes, null);
}
foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -707,12 +709,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (!matches(rowId, foundKeyBuf)) {
// There are no committed writes, we can safely return write-intent.
- return wrapUncommittedValue(valueBytes, null);
+ return wrapUncommittedValue(rowId, valueBytes, null);
}
}
// There is a committed write, but it's more recent than our timestamp (because we didn't find it with first seek).
- return ReadResult.EMPTY;
+ return ReadResult.empty(rowId);
} else {
// Should not be write-intent, as we were seeking with the timestamp.
assert keyLength == MAX_KEY_SIZE;
@@ -723,7 +725,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (rowTimestamp.equals(timestamp)) {
// This is exactly the row we are looking for.
- return wrapCommittedValue(valueBytes, rowTimestamp);
+ return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
}
// Let's check if there is more recent write. If it is a write-intent, then return write-intent.
@@ -732,7 +734,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (invalid(seekIterator)) {
// There is no more recent commits or write-intents.
- return wrapCommittedValue(valueBytes, rowTimestamp);
+ return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
}
foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -740,16 +742,16 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (!matches(rowId, foundKeyBuf)) {
// There is no more recent commits or write-intents under this row id.
- return wrapCommittedValue(valueBytes, rowTimestamp);
+ return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
}
boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
if (isWriteIntent) {
- return wrapUncommittedValue(seekIterator.value(), rowTimestamp);
+ return wrapUncommittedValue(rowId, seekIterator.value(), rowTimestamp);
}
- return wrapCommittedValue(valueBytes, readTimestamp(foundKeyBuf));
+ return wrapCommittedValue(rowId, valueBytes, readTimestamp(foundKeyBuf));
}
}
@@ -875,10 +877,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
buf.position(0);
}
- private RowId getRowId(ByteBuffer buffer) {
- buffer.position(ROW_ID_OFFSET);
+ private RowId getRowId(ByteBuffer keyBuffer) {
+ keyBuffer.position(ROW_ID_OFFSET);
- return new RowId(partitionId, normalize(buffer.getLong()), normalize(buffer.getLong()));
+ return new RowId(partitionId, normalize(keyBuffer.getLong()), normalize(keyBuffer.getLong()));
}
@Override
@@ -1037,11 +1039,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* Converts raw byte array representation of the write-intent value into a read result adding newest commit timestamp if
* it is not {@code null}.
*
+ * @param rowId ID of the corresponding row.
* @param valueBytes Value bytes as read from the storage.
* @param newestCommitTs Commit timestamp of the most recent committed write of this value.
* @return Read result instance.
*/
- private static ReadResult wrapUncommittedValue(byte[] valueBytes, @Nullable HybridTimestamp newestCommitTs) {
+ private static ReadResult wrapUncommittedValue(RowId rowId, byte[] valueBytes, @Nullable HybridTimestamp newestCommitTs) {
UUID txId = bytesToUuid(valueBytes, TX_ID_OFFSET);
UUID commitTableId = bytesToUuid(valueBytes, TABLE_ID_OFFSET);
int commitPartitionId = GridUnsafe.getShort(valueBytes, GridUnsafe.BYTE_ARR_OFF + PARTITION_ID_OFFSET) & 0xFFFF;
@@ -1054,22 +1057,23 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(BINARY_ROW_BYTE_ORDER));
}
- return ReadResult.createFromWriteIntent(row, txId, commitTableId, commitPartitionId, newestCommitTs);
+ return ReadResult.createFromWriteIntent(rowId, row, txId, commitTableId, commitPartitionId, newestCommitTs);
}
/**
* Converts raw byte array representation of the value into a read result.
*
+ * @param rowId ID of the corresponding row.
* @param valueBytes Value bytes as read from the storage.
* @param rowCommitTimestamp Timestamp with which the row was committed.
* @return Read result instance or {@code null} if value is a tombstone.
*/
- private static ReadResult wrapCommittedValue(byte[] valueBytes, HybridTimestamp rowCommitTimestamp) {
+ private static ReadResult wrapCommittedValue(RowId rowId, byte[] valueBytes, HybridTimestamp rowCommitTimestamp) {
if (isTombstone(valueBytes, false)) {
- return ReadResult.EMPTY;
+ return ReadResult.empty(rowId);
}
- return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes), rowCommitTimestamp);
+ return ReadResult.createFromCommitted(rowId, new ByteBufferRow(valueBytes), rowCommitTimestamp);
}
/**
@@ -1251,9 +1255,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (!isWriteIntent) {
// There is no write-intent, return latest committed row.
- readResult = wrapCommittedValue(valueBytes, readTimestamp(currentKeyBuffer));
+ readResult = wrapCommittedValue(rowId, valueBytes, readTimestamp(currentKeyBuffer));
} else {
- readResult = wrapUncommittedValue(valueBytes, nextCommitTimestamp);
+ readResult = wrapUncommittedValue(rowId, valueBytes, nextCommitTimestamp);
}
if (!readResult.isEmpty() || readResult.isWriteIntent()) {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index f36e1bc0b2..6a3bd4572c 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -385,7 +386,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest
when(cursor.hasNext()).thenAnswer(hnInvocation -> cursorTouchCnt.get() < submittedItems.size());
lenient().when(cursor.next()).thenAnswer(ninvocation ->
- ReadResult.createFromCommitted(submittedItems.get(cursorTouchCnt.getAndIncrement()), clock.now()));
+ ReadResult.createFromCommitted(new RowId(0), submittedItems.get(cursorTouchCnt.getAndIncrement()), clock.now()));
return cursor;
});
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 64d752b27b..849da6801b 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -102,8 +102,9 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsCommittedAndUncommittedVersionsFromStorage() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromWriteIntent(
+ rowId1,
new ByteBufferRow(new byte[]{2}),
transactionId,
commitTableId,
@@ -155,8 +156,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void reversesOrderOfVersionsObtainedFromPartition() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
configureStorageToHaveExactlyOneRowWith(List.of(version1, version2));
@@ -174,8 +175,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void iteratesRowsInPartition() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
@@ -213,8 +214,9 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsCommittedAndUncommittedVersionsFromQueue() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
ReadResult version2 = ReadResult.createFromWriteIntent(
+ rowIdOutOfOrder,
new ByteBufferRow(new byte[]{2}),
transactionId,
commitTableId,
@@ -257,8 +259,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsOutOfOrderRowsWithHighestPriority() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version1)));
@@ -298,7 +300,7 @@ class OutgoingSnapshotMvDataStreamingTest {
}
@Test
- void closesVersionsCursor() throws Exception {
+ void closesVersionsCursor() {
@SuppressWarnings("unchecked")
Cursor<ReadResult> cursor = mock(Cursor.class);
@@ -319,7 +321,7 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsTombstonesWithNullBuffers() {
- ReadResult version = ReadResult.createFromCommitted(null, clock.now());
+ ReadResult version = ReadResult.createFromCommitted(rowId1, null, clock.now());
configureStorageToHaveExactlyOneRowWith(List.of(version));
@@ -331,6 +333,7 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void doesNotSendWriteIntentTimestamp() {
ReadResult version = ReadResult.createFromWriteIntent(
+ rowId1,
new ByteBufferRow(new byte[]{1}),
transactionId,
commitTableId,
@@ -356,8 +359,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void mvDataHandlingRespectsBatchSizeHintForMessagesFromPartition() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
@@ -371,8 +374,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void mvDataHandlingRespectsBatchSizeHintForOutOfOrderMessages() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version1)));
@@ -395,8 +398,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void mvDataResponseThatIsNotLastHasFinishFalse() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
@@ -409,7 +412,7 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsRowsFromPartitionBiggerThanHint() {
- ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[1000]), clock.now());
+ ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[1000]), clock.now());
configureStorageToHaveExactlyOneRowWith(List.of(version));
@@ -422,7 +425,7 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsRowsFromOutOfOrderQueueBiggerThanHint() {
- ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[1000]), clock.now());
+ ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[1000]), clock.now());
when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version)));
@@ -456,8 +459,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void lastSentRowIdIsPassed() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
@@ -476,8 +479,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void notYetSentRowIdIsNotPassed() {
- ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
@@ -496,7 +499,7 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void anyRowIdIsPassedForFinishedSnapshot() {
- ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
configureStorageToHaveExactlyOneRowWith(List.of(version));