You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/13 13:31:12 UTC
[ignite-3] branch main updated: IGNITE-17254 Storage API for RAFT snapshot streaming (#1194)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3fb52c5b8e IGNITE-17254 Storage API for RAFT snapshot streaming (#1194)
3fb52c5b8e is described below
commit 3fb52c5b8e47895d9df93b25e22c049797baf09a
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Oct 13 17:31:06 2022 +0400
IGNITE-17254 Storage API for RAFT snapshot streaming (#1194)
---
.../internal/storage/MvPartitionStorage.java | 23 ++++-
.../apache/ignite/internal/storage/ReadResult.java | 42 ++++++--
.../storage/AbstractMvPartitionStorageTest.java | 106 ++++++++++++++++++++-
.../internal/storage/BaseMvStoragesTest.java | 8 +-
.../storage/impl/TestMvPartitionStorage.java | 63 +++++++-----
.../mv/AbstractPageMemoryMvPartitionStorage.java | 71 ++++++++++++--
.../internal/storage/pagememory/mv/RowVersion.java | 7 ++
.../storage/pagememory/mv/VersionChain.java | 3 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 72 ++++++++++----
.../rocksdb/RocksDbMvPartitionStorageTest.java | 8 ++
.../distributed/ItInternalTableScanTest.java | 5 +-
11 files changed, 332 insertions(+), 76 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 0c0ee96dee..2abd01450a 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -98,6 +98,9 @@ public interface MvPartitionStorage extends AutoCloseable {
* <li>There are commits but they're all newer than timestamp - return nothing.</li>
* </ol>
*
+ * <p>{@link ReadResult#newestCommitTimestamp()} is filled by this method for intents having preceding committed
+ * versions.
+ *
* @param rowId Row id.
* @param timestamp Timestamp.
* @return Read result that corresponds to the key.
@@ -142,12 +145,30 @@ public interface MvPartitionStorage extends AutoCloseable {
*/
void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException;
+ /**
+ * Creates a committed version.
+ * In details:
+ * - if there is no uncommitted version, a new committed version is added
+ * - if there is an uncommitted version, this method may fail with a system exception (this method should not be called if there
+ * is already something uncommitted for the given row).
+ *
+ * @param rowId Row id.
+ * @param row Binary row to update. Key only row means value removal.
+ * @param commitTimestamp Timestamp to associate with committed value.
+ * @throws StorageException If failed to write data to the storage.
+ */
+ void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException;
+
/**
* Scans all versions of a single row.
*
+ * <p>{@link ReadResult#newestCommitTimestamp()} is NOT filled by this method for intents having preceding committed
+ * versions.
+ *
* @param rowId Row id.
+ * @return Cursor of results including both rows data and transaction-related context. The versions are ordered from newest to oldest.
*/
- Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException;
+ Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
/**
* Scans the partition and returns a cursor of values at the given timestamp.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index a1d912c6b2..41819efe8e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -30,7 +30,7 @@ public class ReadResult {
public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
/** Empty read result. */
- public static final ReadResult EMPTY = new ReadResult(null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+ public static final ReadResult EMPTY = new ReadResult(null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
/** Data. */
private final @Nullable BinaryRow binaryRow;
@@ -45,13 +45,26 @@ public class ReadResult {
private final int commitPartitionId;
/**
- * Timestamp of the newest commit of the data. Not {@code null} iff committed version exists, this is a
+ * Commit timestamp of this version (if exists). Non-null for committed versions, {@code null} for write intents.
+ */
+ private final @Nullable HybridTimestamp commitTs;
+
+ /**
+ * Timestamp of the newest commit of the data. Not {@code null} if committed version exists, this is a
* write-intent and read was made with a timestamp.
+ * Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)} even for write intents having
+ * a preceding committed version.
*/
private final @Nullable HybridTimestamp newestCommitTs;
- private ReadResult(BinaryRow binaryRow, @Nullable UUID transactionId, @Nullable UUID commitTableId,
- @Nullable HybridTimestamp newestCommitTs, int commitPartitionId) {
+ private ReadResult(
+ @Nullable BinaryRow binaryRow,
+ @Nullable UUID transactionId,
+ @Nullable UUID commitTableId,
+ @Nullable HybridTimestamp commitTs,
+ @Nullable HybridTimestamp newestCommitTs,
+ int commitPartitionId
+ ) {
this.binaryRow = binaryRow;
// If transaction is not null, then commitTableId and commitPartitionId should be defined.
@@ -62,17 +75,18 @@ public class ReadResult {
this.transactionId = transactionId;
this.commitTableId = commitTableId;
+ this.commitTs = commitTs;
this.newestCommitTs = newestCommitTs;
this.commitPartitionId = commitPartitionId;
}
public static ReadResult createFromWriteIntent(BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
- @Nullable HybridTimestamp lastCommittedTimestamp, int commitPartitionId) {
- return new ReadResult(binaryRow, transactionId, commitTableId, lastCommittedTimestamp, commitPartitionId);
+ int commitPartitionId, @Nullable HybridTimestamp lastCommittedTimestamp) {
+ return new ReadResult(binaryRow, transactionId, commitTableId, null, lastCommittedTimestamp, commitPartitionId);
}
- public static ReadResult createFromCommitted(BinaryRow binaryRow) {
- return new ReadResult(binaryRow, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+ public static ReadResult createFromCommitted(BinaryRow binaryRow, HybridTimestamp commitTs) {
+ return new ReadResult(binaryRow, null, null, commitTs, null, UNDEFINED_COMMIT_PARTITION_ID);
}
/**
@@ -107,7 +121,17 @@ public class ReadResult {
}
/**
- * Returns timestamp of the most recent commit of the row.
+ * Returns commit timestamp of this version (of exists). Non-null for committed versions, {@code null} for write intents.
+ *
+ * @return Commit timestamp of this version (of exists). Non-null for committed versions, {@code null} for write intents.
+ */
+ public @Nullable HybridTimestamp commitTimestamp() {
+ return commitTs;
+ }
+
+ /**
+ * Returns timestamp of the most recent commit of the row. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
+ * even for write intents having a preceding committed version.
*
* @return Timestamp of the most recent commit of the row.
*/
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index c687068f24..dccd12b700 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,19 +74,19 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
private final BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));
/**
- * Reads a row inside of consistency closure.
+ * Reads a row.
*/
protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
- ReadResult readResult = storage.runConsistently(() -> storage.read(rowId, timestamp));
+ ReadResult readResult = storage.read(rowId, timestamp);
return readResult.binaryRow();
}
/**
- * Scans partition inside of consistency closure.
+ * Scans partition.
*/
protected PartitionTimestampCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
- return storage.runConsistently(() -> storage.scan(filter, timestamp));
+ return storage.scan(filter, timestamp);
}
/**
@@ -117,6 +118,18 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
});
}
+ /**
+ * Writes a row to storage like if it was first added using {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, int)}
+ * and immediately committed with {@link MvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
+ */
+ protected void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) {
+ storage.runConsistently(() -> {
+ storage.addWriteCommitted(rowId, row, commitTimestamp);
+
+ return null;
+ });
+ }
+
/**
* Aborts write-intent inside of consistency closure.
*/
@@ -602,6 +615,15 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertThat(returnedRow, is(nullValue()));
}
+ @Test
+ void addWriteCreatesUncommittedVersion() {
+ RowId rowId = insert(binaryRow, txId);
+
+ ReadResult readResult = storage.read(rowId, clock.now());
+
+ assertTrue(readResult.isWriteIntent());
+ }
+
@Test
void afterRemovalReadWithTxIdFindsNothing() {
RowId rowId = insert(binaryRow, newTransactionId());
@@ -660,6 +682,16 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertThat(rowFromRemoval, is(nullValue()));
}
+ @Test
+ void commitWriteCommitsWriteIntentVersion() {
+ RowId rowId = insert(binaryRow, txId);
+ commitWrite(rowId, clock.now());
+
+ ReadResult readResult = storage.read(rowId, clock.now());
+
+ assertFalse(readResult.isWriteIntent());
+ }
+
@Test
void commitWriteMakesVersionAvailableToReadByTimestamp() {
RowId rowId = insert(binaryRow, txId);
@@ -1138,6 +1170,72 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertNull(storage.closestRowId(rowId2.increment()));
}
+ @Test
+ public void addWriteCommittedAddsCommittedVersion() {
+ RowId rowId = new RowId(PARTITION_ID);
+
+ addWriteCommitted(rowId, binaryRow, clock.now());
+
+ // Read with timestamp returns write-intent.
+ assertRowMatches(storage.read(rowId, clock.now()).binaryRow(), binaryRow);
+ }
+
+ @Test
+ public void addWriteCommittedLeavesExistingCommittedVersionsUntouched() {
+ RowId rowId = new RowId(PARTITION_ID);
+
+ HybridTimestamp ts1 = clock.now();
+
+ addWriteCommitted(rowId, binaryRow, ts1);
+ addWriteCommitted(rowId, binaryRow2, clock.now());
+
+ assertRowMatches(storage.read(rowId, clock.now()).binaryRow(), binaryRow2);
+ assertRowMatches(storage.read(rowId, ts1).binaryRow(), binaryRow);
+ }
+
+ @Test
+ public void addWriteCommittedThrowsIfUncommittedVersionExists() {
+ RowId rowId = insert(binaryRow, txId);
+
+ StorageException ex = assertThrows(StorageException.class, () -> addWriteCommitted(rowId, binaryRow2, clock.now()));
+ assertThat(ex.getMessage(), is("Write intent exists for " + rowId));
+ }
+
+ @Test
+ public void scanVersionsReturnsUncommittedVersionsAsUncommitted() throws Exception {
+ RowId rowId = insert(binaryRow, txId);
+ commitWrite(rowId, clock.now());
+ addWrite(rowId, binaryRow2, newTransactionId());
+
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ ReadResult result = cursor.next();
+
+ assertTrue(result.isWriteIntent());
+ assertThat(result.commitPartitionId(), is(not(ReadResult.UNDEFINED_COMMIT_PARTITION_ID)));
+ assertThat(result.commitTableId(), is(notNullValue()));
+ assertThat(result.transactionId(), is(notNullValue()));
+ assertThat(result.commitTimestamp(), is(nullValue()));
+ assertThat(result.newestCommitTimestamp(), is(nullValue()));
+ }
+ }
+
+ @Test
+ public void scanVersionsReturnsCommittedVersionsAsCommitted() throws Exception {
+ RowId rowId = insert(binaryRow, txId);
+ commitWrite(rowId, clock.now());
+
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ ReadResult result = cursor.next();
+
+ assertFalse(result.isWriteIntent());
+ assertThat(result.commitPartitionId(), is(ReadResult.UNDEFINED_COMMIT_PARTITION_ID));
+ assertThat(result.commitTableId(), is(nullValue()));
+ assertThat(result.transactionId(), is(nullValue()));
+ assertThat(result.commitTimestamp(), is(notNullValue()));
+ assertThat(result.newestCommitTimestamp(), is(nullValue()));
+ }
+ }
+
/**
* Returns row id that is lexicographically smaller (by the value of one) than the argument.
*
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 6319179fe9..4c44aff251 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -129,13 +129,7 @@ public abstract class BaseMvStoragesTest {
return new IgniteBiTuple<>(key(binaryRow), value(binaryRow));
}
- protected static List<IgniteBiTuple<TestKey, TestValue>> toList(Cursor<BinaryRow> cursor) throws Exception {
- try (cursor) {
- return cursor.stream().map(BaseMvStoragesTest::unwrap).collect(Collectors.toList());
- }
- }
-
- protected static List<IgniteBiTuple<TestKey, TestValue>> toList(PartitionTimestampCursor cursor) throws Exception {
+ protected static List<IgniteBiTuple<TestKey, TestValue>> toList(Cursor<ReadResult> cursor) throws Exception {
try (cursor) {
return cursor.stream().map(BaseMvStoragesTest::unwrap).collect(Collectors.toList());
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index ff64394254..24a18b3466 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -175,6 +175,18 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
});
}
+ /** {@inheritDoc} */
+ @Override
+ public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
+ map.compute(rowId, (ignored, versionChain) -> {
+ if (versionChain != null && versionChain.isWriteIntent()) {
+ throw new StorageException("Write intent exists for " + rowId);
+ }
+
+ return new VersionChain(row, commitTimestamp, null, null, ReadResult.UNDEFINED_COMMIT_PARTITION_ID, versionChain);
+ });
+ }
+
/** {@inheritDoc} */
@Override
public ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
@@ -211,9 +223,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
if (timestamp == null) {
// Search by transaction id.
- BinaryRow binaryRow = versionChain.row;
- if (filter != null && !filter.test(binaryRow)) {
+ if (filter != null && !filter.test(versionChain.row)) {
return ReadResult.EMPTY;
}
@@ -221,24 +232,12 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
throw new TxIdMismatchException(txId, versionChain.txId);
}
- boolean isWriteIntent = versionChain.ts == null;
-
- if (isWriteIntent) {
- return ReadResult.createFromWriteIntent(
- binaryRow,
- versionChain.txId,
- versionChain.commitTableId,
- versionChain.next != null ? versionChain.next.ts : null,
- versionChain.commitPartitionId
- );
- }
-
- return ReadResult.createFromCommitted(binaryRow);
+ return versionChainToReadResult(versionChain, true);
}
VersionChain cur = versionChain;
- if (cur.ts == null) {
+ if (cur.isWriteIntent()) {
// We have a write-intent.
if (cur.next == null) {
// We *only* have a write-intent, return it.
@@ -248,8 +247,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
return ReadResult.EMPTY;
}
- return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, null,
- cur.commitPartitionId);
+ return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null
+ );
}
// Move to first commit.
@@ -259,6 +258,19 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
return walkVersionChain(versionChain, timestamp, filter, cur);
}
+ private static ReadResult versionChainToReadResult(VersionChain versionChain, boolean fillLastCommittedTs) {
+ if (versionChain.isWriteIntent()) {
+ return ReadResult.createFromWriteIntent(
+ versionChain.row,
+ versionChain.txId,
+ versionChain.commitTableId,
+ versionChain.commitPartitionId, fillLastCommittedTs && versionChain.next != null ? versionChain.next.ts : null
+ );
+ }
+
+ return ReadResult.createFromCommitted(versionChain.row, versionChain.ts);
+ }
+
/**
* Walks version chain to find a row by timestamp. See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details.
*
@@ -281,8 +293,9 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
return ReadResult.EMPTY;
}
- return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, firstCommit.ts,
- chainHead.commitPartitionId);
+ return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, chainHead.commitPartitionId,
+ firstCommit.ts
+ );
}
VersionChain cur = firstCommit;
@@ -298,7 +311,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
return ReadResult.EMPTY;
}
- return ReadResult.createFromCommitted(binaryRow);
+ return ReadResult.createFromCommitted(binaryRow, cur.ts);
}
cur = cur.next;
@@ -308,8 +321,12 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
}
@Override
- public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
- return Cursor.fromIterator(Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next).map(vc -> vc.row).iterator());
+ public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
+ return Cursor.fromIterator(
+ Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
+ .map((VersionChain versionChain) -> versionChainToReadResult(versionChain, false))
+ .iterator()
+ );
}
/** {@inheritDoc} */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 0e8aba6174..f9da31bd26 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -141,8 +141,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
* Starts a partition by initializing its internal structures.
*/
public void start() {
- try {
- Cursor<IndexMeta> cursor = indexMetaTree.find(null, null);
+ try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
NamedListView<TableIndexView> indexesCfgView = tablesConfiguration.indexes().value();
@@ -161,7 +160,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
//TODO IGNITE-17626 Drop the index synchronously.
}
}
- } catch (IgniteInternalCheckedException e) {
+ } catch (Exception e) {
throw new StorageException("Failed to process SQL indexes during the partition start", e);
}
}
@@ -393,7 +392,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
row = new ByteBufferRow(rowVersion.value());
}
- return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, null, commitPartitionId);
+ return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, null);
}
}
@@ -443,7 +442,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
row = new ByteBufferRow(rowVersion.value());
}
- return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, firstCommit.timestamp(), commitPartitionId);
+ return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, firstCommit.timestamp());
}
RowVersion curCommit = firstCommit;
@@ -463,7 +462,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
row = new ByteBufferRow(curCommit.value());
}
- return ReadResult.createFromCommitted(row);
+ return ReadResult.createFromCommitted(row, curCommit.timestamp());
}
if (!curCommit.hasNextLink()) {
@@ -477,8 +476,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
- // TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
- byte[] rowBytes = row == null ? TOMBSTONE_PAYLOAD : row.bytes();
+ byte[] rowBytes = rowBytes(row);
RowVersion rowVersion = new RowVersion(partitionId, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
@@ -495,6 +493,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
+ private static byte[] rowBytes(@Nullable BinaryRow row) {
+ // TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+ return row == null ? TOMBSTONE_PAYLOAD : row.bytes();
+ }
+
@Override
public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
throws TxIdMismatchException, StorageException {
@@ -627,7 +630,37 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
@Override
- public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+ public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
+ assert rowId.partitionId() == partitionId : rowId;
+
+ VersionChain currentChain = findVersionChain(rowId);
+
+ if (currentChain != null && currentChain.isUncommitted()) {
+ // This means that there is a bug in our code as the caller must make sure that no write intent exists
+ // below this write.
+ throw new StorageException("Write intent exists for " + rowId);
+ }
+
+ long nextLink = currentChain == null ? NULL_LINK : currentChain.newestCommittedLink();
+ RowVersion newVersion = insertCommittedRowVersion(row, commitTimestamp, nextLink);
+
+ VersionChain chainReplacement = VersionChain.createCommitted(rowId, newVersion.link(), newVersion.nextLink());
+
+ updateVersionChain(chainReplacement);
+ }
+
+ private RowVersion insertCommittedRowVersion(BinaryRow row, HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
+ byte[] rowBytes = rowBytes(row);
+
+ RowVersion rowVersion = new RowVersion(partitionId, commitTimestamp, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+ insertRowVersion(rowVersion);
+
+ return rowVersion;
+ }
+
+ @Override
+ public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
try {
VersionChain versionChain = versionChainTree.findOne(new VersionChainKey(rowId));
@@ -641,12 +674,30 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
rowVersion.nextLink() == 0 ? null : readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE)
);
- return Cursor.fromIterator(stream.map(rowVersion -> new ByteBufferRow(rowVersion.value())).iterator());
+ return Cursor.fromIterator(
+ stream.map(rowVersion -> rowVersionToResultNotFillingLastCommittedTs(versionChain, rowVersion))
+ .iterator()
+ );
} catch (IgniteInternalCheckedException e) {
throw new RuntimeException(e);
}
}
+ private static ReadResult rowVersionToResultNotFillingLastCommittedTs(VersionChain versionChain, RowVersion rowVersion) {
+ ByteBufferRow row = new ByteBufferRow(rowVersion.value());
+
+ if (rowVersion.isCommitted()) {
+ return ReadResult.createFromCommitted(row, rowVersion.timestamp());
+ } else {
+ return ReadResult.createFromWriteIntent(
+ row,
+ versionChain.transactionId(),
+ versionChain.commitTableId(),
+ versionChain.commitPartitionId(), null
+ );
+ }
+ }
+
/**
* Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
* or already committed in a different transaction.
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 501e89f319..c51d8b888b 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -64,6 +64,13 @@ public final class RowVersion implements Storable {
this(partitionId, 0, null, nextLink, value);
}
+ /**
+ * Constructor.
+ */
+ public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long nextLink, ByteBuffer value) {
+ this(partitionId, 0, commitTimestamp, nextLink, value);
+ }
+
/**
* Constructor.
*/
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
index ea02198564..b1b2a399e3 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -21,6 +21,7 @@ import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import java.util.UUID;
import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
@@ -62,7 +63,7 @@ public class VersionChain extends VersionChainKey {
}
public static VersionChain createCommitted(RowId rowId, long headLink, long nextLink) {
- return new VersionChain(rowId, null, null, -1, headLink, nextLink);
+ return new VersionChain(rowId, null, null, ReadResult.UNDEFINED_COMMIT_PARTITION_ID, headLink, nextLink);
}
public static VersionChain createUncommitted(RowId rowId, UUID transactionId, UUID commitTableId, int commitPartitionId, long headLink,
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4352a306aa..aba18f52b1 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -318,7 +318,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
throws TxIdMismatchException, StorageException {
- WriteBatchWithIndex writeBatch = requireWriteBatch();
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
@@ -373,10 +373,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
*/
private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
throws RocksDBException {
- WriteBatchWithIndex writeBatch = requireWriteBatch();
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
- //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
- byte[] rowBytes = row.bytes();
+ byte[] rowBytes = rowBytes(row);
ByteBuffer value = ByteBuffer.allocate(rowBytes.length + VALUE_HEADER_SIZE);
byte[] array = value.array();
@@ -391,6 +390,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
writeBatch.put(cf, copyOf(keyArray, ROW_PREFIX_SIZE), value.array());
}
+ private static byte[] rowBytes(BinaryRow row) {
+ //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+ return row.bytes();
+ }
+
/** {@inheritDoc} */
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
@@ -447,6 +451,23 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
+ @Override
+ public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
+
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ putTimestamp(keyBuf, commitTimestamp);
+
+ //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+ byte[] rowBytes = rowBytes(row);
+
+ try {
+ writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to update a row in storage", e);
+ }
+ }
+
/**
* Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
*
@@ -519,7 +540,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (!isWriteIntent) {
// There is no write-intent, return latest committed row.
- return wrapCommittedValue(valueBytes);
+ return wrapCommittedValue(valueBytes, readTimestamp(readKeyBuf));
}
assert valueBytes != null;
@@ -624,13 +645,13 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// Should not be write-intent, as we were seeking with the timestamp.
assert keyLength == MAX_KEY_SIZE;
- HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf, ROW_PREFIX_SIZE);
+ HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf);
byte[] valueBytes = seekIterator.value();
if (rowTimestamp.equals(timestamp)) {
// This is exactly the row we are looking for.
- return wrapCommittedValue(valueBytes);
+ return wrapCommittedValue(valueBytes, rowTimestamp);
}
// Let's check if there is more recent write. If it is a write-intent, then return write-intent.
@@ -639,7 +660,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (invalid(seekIterator)) {
// There is no more recent commits or write-intents.
- return wrapCommittedValue(valueBytes);
+ return wrapCommittedValue(valueBytes, rowTimestamp);
}
foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -647,7 +668,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (!matches(rowId, foundKeyBuf)) {
// There is no more recent commits or write-intents under this row id.
- return wrapCommittedValue(valueBytes);
+ return wrapCommittedValue(valueBytes, rowTimestamp);
}
boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
@@ -656,7 +677,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return wrapUncommittedValue(seekIterator.value(), rowTimestamp);
}
- return wrapCommittedValue(valueBytes);
+ return wrapCommittedValue(valueBytes, readTimestamp(foundKeyBuf));
}
}
@@ -675,7 +696,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
@Override
- public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+ public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
@@ -692,8 +713,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return new RocksIteratorAdapter<>(it) {
@Override
- protected BinaryRow decodeEntry(byte[] key, byte[] value) {
- return wrapValueIntoBinaryRow(value, key.length == ROW_PREFIX_SIZE);
+ protected ReadResult decodeEntry(byte[] key, byte[] value) {
+ int keyLength = key.length;
+
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+ if (!isWriteIntent) {
+ return wrapCommittedValue(value, readTimestamp(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER)));
+ }
+
+ assert value != null;
+
+ return wrapUncommittedValue(value, null);
}
@Override
@@ -1147,11 +1178,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
buf.putInt(~ts.getLogical());
}
- private static HybridTimestamp readTimestamp(ByteBuffer buf, int off) {
- assert buf.order() == KEY_BYTE_ORDER;
+ private static HybridTimestamp readTimestamp(ByteBuffer keyBuf) {
+ assert keyBuf.order() == KEY_BYTE_ORDER;
- long physical = ~buf.getLong(off);
- int logical = ~buf.getInt(off + Long.BYTES);
+ long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE);
+ int logical = ~keyBuf.getInt(ROW_PREFIX_SIZE + Long.BYTES);
return new HybridTimestamp(physical, logical);
}
@@ -1226,21 +1257,22 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(BINARY_ROW_BYTE_ORDER));
}
- return ReadResult.createFromWriteIntent(row, txId, commitTableId, newestCommitTs, commitPartitionId);
+ return ReadResult.createFromWriteIntent(row, txId, commitTableId, commitPartitionId, newestCommitTs);
}
/**
* Converts raw byte array representation of the value into a read result.
*
* @param valueBytes Value bytes as read from the storage.
+ * @param rowCommitTimestamp Timestamp with which the row was committed.
* @return Read result instance or {@code null} if value is a tombstone.
*/
- private static ReadResult wrapCommittedValue(byte[] valueBytes) {
+ private static ReadResult wrapCommittedValue(byte[] valueBytes, HybridTimestamp rowCommitTimestamp) {
if (isTombstone(valueBytes, false)) {
return ReadResult.EMPTY;
}
- return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes));
+ return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes), rowCommitTimestamp);
}
/**
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 2d9addd3b1..eb7675d0fb 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -78,4 +78,12 @@ public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTes
engine == null ? null : engine::stop
);
}
+
+ @Override
+ public void addWriteCommittedThrowsIfUncommittedVersionExists() {
+ // Disable this test because RocksDbMvPartitionStorage does not throw. It does not throw because this
+ // exception is thrown only to ease debugging as the caller must make sure that no write intent exists
+ // before calling addWriteCommitted(). For RocksDbMvPartitionStorage, it is not that cheap to check whether
+ // there is a write intent in the storage, so we do not require it to throw this optional exception.
+ }
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index d360289abd..3c9ca940e4 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -74,6 +75,8 @@ public class ItInternalTableScanTest {
/** Internal table to test. */
private InternalTable internalTbl;
+ private final HybridClock clock = new HybridClock();
+
/**
* Prepare test environment using DummyInternalTableImpl and Mocked storage.
*/
@@ -379,7 +382,7 @@ public class ItInternalTableScanTest {
when(cursor.hasNext()).thenAnswer(hnInvocation -> cursorTouchCnt.get() < submittedItems.size());
when(cursor.next()).thenAnswer(ninvocation ->
- ReadResult.createFromCommitted(submittedItems.get(cursorTouchCnt.getAndIncrement())));
+ ReadResult.createFromCommitted(submittedItems.get(cursorTouchCnt.getAndIncrement()), clock.now()));
return cursor;
});