You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/06/01 15:34:09 UTC
[ignite-3] branch main updated: IGNITE-16933 PageMemory-based MV storage implementation (#814)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1d76a6ee4 IGNITE-16933 PageMemory-based MV storage implementation (#814)
1d76a6ee4 is described below
commit 1d76a6ee44fa37304b2c38cc1de677be966a63b0
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Jun 1 19:34:05 2022 +0400
IGNITE-16933 PageMemory-based MV storage implementation (#814)
---
.../internal/pagememory/PageMemoryDataRegion.java | 5 +-
.../pagememory/datapage/DataPageReader.java | 97 ++++
.../datapage/NonFragmentableDataPageReader.java | 134 +++++
.../pagememory/datapage/PageMemoryTraversal.java | 50 ++
.../datapage/ReadPageMemoryRowValue.java | 137 ++++++
.../internal/pagememory/freelist/FreeList.java | 6 +-
.../internal/pagememory/io/AbstractDataPageIo.java | 92 +++-
.../internal/pagememory/io/DataPagePayload.java | 9 +
.../ignite/internal/schema/ByteBufferRow.java | 10 +-
.../internal/storage/MvPartitionStorage.java | 13 +-
.../storage/NoUncommittedVersionException.java} | 18 +-
.../storage/AbstractMvPartitionStorageTest.java | 542 ++++++++++++++++-----
.../internal/storage/BaseMvStoragesTest.java | 1 -
.../storage/basic/TestMvPartitionStorage.java | 8 +-
.../storage/basic/TestMvPartitionStorageTest.java | 16 +-
.../pagememory/AbstractPageMemoryDataRegion.java | 6 +-
.../pagememory/PageMemoryStorageEngine.java | 3 +-
.../storage/pagememory/PageMemoryTableStorage.java | 17 +-
.../internal/storage/pagememory/TableTree.java | 4 +-
.../pagememory/VolatilePageMemoryDataRegion.java | 114 ++++-
.../pagememory/VolatilePageMemoryTableStorage.java | 2 +-
.../internal/storage/pagememory/mv/LinkRowId.java} | 35 +-
.../mv/PageMemoryMvPartitionStorage.java | 521 ++++++++++++++++++++
.../pagememory/mv/PageMemoryMvStorageIoModule.java | 45 ++
.../storage/pagememory/mv/PartitionlessLinks.java | 134 +++++
.../pagememory/mv/ReadLatestRowVersion.java | 95 ++++
.../pagememory/mv/ReadRowVersionValue.java} | 25 +-
.../RowIdIsInvalidForModificationsException.java} | 19 +-
.../internal/storage/pagememory/mv/RowVersion.java | 180 +++++++
.../storage/pagememory/mv/RowVersionFreeList.java | 153 ++++++
.../pagememory/mv/ScanVersionChainByTimestamp.java | 120 +++++
.../internal/storage/pagememory/mv/Timestamps.java | 70 +++
.../storage/pagememory/mv/TransactionIds.java | 57 +++
.../storage/pagememory/mv/VersionChain.java | 107 ++++
.../pagememory/mv/VersionChainDataPageReader.java | 84 ++++
.../pagememory/mv/VersionChainFreeList.java | 164 +++++++
.../storage/pagememory/mv/VersionChainLink.java} | 34 +-
.../storage/pagememory/mv/VersionChainTree.java | 114 +++++
.../storage/pagememory/mv/io/RowVersionDataIo.java | 106 ++++
.../pagememory/mv/io/VersionChainDataIo.java | 95 ++++
.../pagememory/mv/io/VersionChainInnerIo.java | 86 ++++
.../storage/pagememory/mv/io/VersionChainIo.java} | 21 +-
.../pagememory/mv/io/VersionChainLeafIo.java | 86 ++++
.../pagememory/mv/io/VersionChainMetaIo.java} | 28 +-
...ache.ignite.internal.pagememory.io.PageIoModule | 1 +
.../mv/PageMemoryMvPartitionStorageTest.java | 144 ++++++
.../pagememory/mv/PartitionlessLinksTest.java | 68 +++
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 4 +
.../rocksdb/RocksDbMvPartitionStorageTest.java | 9 +-
.../org/apache/ignite/internal/tx/Timestamp.java | 12 +-
50 files changed, 3623 insertions(+), 278 deletions(-)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
index dbc6e3c2f..46032b6b6 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.pagememory;
-import org.jetbrains.annotations.Nullable;
-
/**
* Data region based on {@link PageMemory}.
*/
@@ -29,8 +27,7 @@ public interface PageMemoryDataRegion {
boolean persistent();
/**
- * Returns page memory, {@code null} if not started.
+ * Returns page memory or throws an exception if not started.
*/
- @Nullable
PageMemory pageMemory();
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java
new file mode 100644
index 000000000..9100edfa6
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains logic for reading values from page memory data pages (this includes handling multy-page values).
+ */
+public class DataPageReader {
+ private final PageMemory pageMemory;
+ private final int groupId;
+ private final IoStatisticsHolder statisticsHolder;
+
+ /**
+ * Constructs a new instance.
+ *
+ * @param pageMemory Page memory that will be used to lock and access memory.
+ * @param groupId ID of the cache group with which the reader works (all pages must belong to this group)
+ * @param statisticsHolder used to track statistics about operations
+ */
+ public DataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder statisticsHolder) {
+ this.pageMemory = pageMemory;
+ this.groupId = groupId;
+ this.statisticsHolder = statisticsHolder;
+ }
+
+ /**
+ * Traverses page memory starting at the given link. At each step, reads the current data payload and feeds it to the given
+ * {@link PageMemoryTraversal} object which updates itself (usually) and returns next link to continue traversal
+ * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop).
+ *
+ * @param link Row link
+ * @param traversal Object consuming payloads and controlling the traversal.
+ * @param argument Argument that is passed to the traversal.
+ * @throws IgniteInternalCheckedException If failed
+ * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, int)
+ * @see PageMemoryTraversal
+ */
+ public <T> void traverse(final long link, PageMemoryTraversal<T> traversal, @Nullable T argument)
+ throws IgniteInternalCheckedException {
+ assert link != 0;
+
+ int pageSize = pageMemory.realPageSize(groupId);
+
+ long currentLink = link;
+
+ do {
+ final long pageId = pageId(currentLink);
+ final long page = pageMemory.acquirePage(groupId, pageId, statisticsHolder);
+
+ try {
+ long pageAddr = pageMemory.readLock(groupId, pageId, page);
+ assert pageAddr != 0L : currentLink;
+
+ try {
+ AbstractDataPageIo<?> dataIo = pageMemory.ioRegistry().resolve(pageAddr);
+
+ int itemId = itemId(currentLink);
+
+ DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
+
+ currentLink = traversal.consumePagePayload(currentLink, pageAddr, data, argument);
+ } finally {
+ pageMemory.readUnlock(groupId, pageId, page);
+ }
+ } finally {
+ pageMemory.releasePage(groupId, pageId, page);
+ }
+ } while (currentLink != PageMemoryTraversal.STOP_TRAVERSAL);
+
+ traversal.finish();
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/NonFragmentableDataPageReader.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/NonFragmentableDataPageReader.java
new file mode 100644
index 000000000..3bf834136
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/NonFragmentableDataPageReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains logic for reading values (that cannot be fragmented, that it, occupy more than one page) from page memory data pages.
+ */
+public abstract class NonFragmentableDataPageReader<T> {
+ private final PageMemory pageMemory;
+ private final int groupId;
+ private final IoStatisticsHolder statisticsHolder;
+
+ /**
+ * Constructs a new instance.
+ *
+ * @param pageMemory page memory that will be used to lock and access memory
+ * @param groupId ID of the cache group with which the reader works (all pages must belong to this group)
+ * @param statisticsHolder used to track statistics about operations
+ */
+ public NonFragmentableDataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder statisticsHolder) {
+ this.pageMemory = pageMemory;
+ this.groupId = groupId;
+ this.statisticsHolder = statisticsHolder;
+ }
+
+ /**
+ * Returns a row by link. If it turns out the value was fragmented, an exception is thrown, as this implementation
+ * cannot handle fragmented values.
+ *
+ * @param link Row link
+ * @return row object assembled from the row bytes
+ * @throws IgniteInternalCheckedException If failed
+ * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, int)
+ */
+ @Nullable
+ public T getRowByLink(final long link) throws IgniteInternalCheckedException {
+ assert link != 0;
+
+ int pageSize = pageMemory.realPageSize(groupId);
+
+ final long pageId = pageId(link);
+
+ final long page = pageMemory.acquirePage(groupId, pageId, statisticsHolder);
+
+ try {
+ long pageAddr = pageMemory.readLock(groupId, pageId, page);
+
+ assert pageAddr != 0L : link;
+
+ try {
+ AbstractDataPageIo<?> dataIo = pageMemory.ioRegistry().resolve(pageAddr);
+
+ int itemId = itemId(link);
+
+ if (handleNonExistentItemsGracefully() && !dataIo.itemExists(pageAddr, itemId, pageSize)) {
+ return rowForNonExistingItem(pageId, itemId);
+ }
+
+ DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
+
+ if (data.hasMoreFragments()) {
+ throw new IllegalStateException("Value for link " + link + " is fragmented, which is not supported");
+ }
+
+ return readRowFromAddress(link, pageAddr + data.offset());
+
+ } finally {
+ pageMemory.readUnlock(groupId, pageId, page);
+ }
+ } finally {
+ pageMemory.releasePage(groupId, pageId, page);
+ }
+ }
+
+ /**
+ * Reads row object from a contiguous region of memory represented by its starting address. The memory region contains exactly
+ * the bytes representing the row.
+ *
+ * @param link row link
+ * @param pageAddr address of memory containing all row bytes
+ * @return row object
+ * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, int)
+ */
+ protected abstract T readRowFromAddress(long link, long pageAddr);
+
+ /**
+ * Returns {@code true} if graceful handling of non-existent items should be enabled.
+ * If it is enabled and a link with a non-existent item ID is provided, {@link #getRowByLink(long)} will return
+ * the result of {@link #rowForNonExistingItem(long, long)} invocation.
+ * If it is disabled, then an assertion will fail.
+ *
+ * @return {@code true} if graceful handling of non-existent items should be enabled
+ */
+ protected boolean handleNonExistentItemsGracefully() {
+ return false;
+ }
+
+ /**
+ * Returns a special row value that can be used as a marker to specify that the given itemId does not actually exist.
+ *
+ * @param pageId ID of the page
+ * @param itemId ID of the item
+ * @return special row value
+ */
+ @Nullable
+ protected T rowForNonExistingItem(long pageId, long itemId) {
+ throw new IllegalStateException("Item is invalid for pageId=" + pageId + ", itemId=" + itemId);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
new file mode 100644
index 000000000..53fcedd01
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+
+/**
+ * Controls page memory traversal.
+ *
+ * @see DataPageReader#traverse(long, PageMemoryTraversal)
+ */
+public interface PageMemoryTraversal<T> {
+ /**
+ * Returned to signal that the traversal has to stop.
+ */
+ long STOP_TRAVERSAL = 0;
+
+ /**
+ * Consumes the currently traversed data payload and decides how to proceed.
+ *
+ * @param link link to the current data payload
+ * @param pageAddr address of the current page
+ * @param payload represents the row content
+ * @param arg argument passed to the traversal
+ * @return next row link or {@link #STOP_TRAVERSAL} to stop the traversal
+ */
+ long consumePagePayload(long link, long pageAddr, DataPagePayload payload, T arg);
+
+ /**
+ * Called when the traversal is finishced successfully.
+ */
+ default void finish() {
+ // no-op
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
new file mode 100644
index 000000000..98aad6a8a
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import java.util.Objects;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reads full data payload value (as byte array) from page memory. Supports fragmented values occupying more than one slot.
+ *
+ * <p>This works for the cases when the following conditions are satisfied:
+ * 1. Row data starts with a fixed-length header, which is followed by the 'value'; the 'value' ends the row data
+ * 2. Row data header contains 'value' size as an int at a fixed offset known beforehand
+ * 3. The beginning of the header, including the 'value' size, is always stored in the first slot (i.e.
+ * {@link Storable#headerSize()} is enough to include 'value' size
+ */
+public abstract class ReadPageMemoryRowValue implements PageMemoryTraversal<Void> {
+ /**
+ * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink);
+ * then it's {@code false} (when we found the version we need and we read its value).
+ */
+ private boolean readingFirstSlot = true;
+
+ private int valueSize;
+ /**
+ * Used to collect all the bytes of the target version value.
+ */
+ private byte @Nullable [] allValueBytes;
+ /**
+ * Number of bytes written to {@link #allValueBytes}.
+ */
+ private int transferredBytes = 0;
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Void ignoredArg) {
+ if (readingFirstSlot) {
+ readingFirstSlot = false;
+ return readFullyOrStartReadingFragmented(pageAddr, payload);
+ } else {
+ // We are continuing reading a fragmented row.
+ return readNextFragment(pageAddr, payload);
+ }
+ }
+
+ private long readFullyOrStartReadingFragmented(long pageAddr, DataPagePayload payload) {
+ valueSize = readValueSize(pageAddr, payload);
+
+ if (!payload.hasMoreFragments()) {
+ return readFully(pageAddr, payload);
+ } else {
+ allValueBytes = new byte[valueSize];
+ transferredBytes = 0;
+
+ readValueFragmentToArray(pageAddr, payload, valueOffsetInFirstSlot());
+
+ return payload.nextLink();
+ }
+ }
+
+ private int readValueSize(long pageAddr, DataPagePayload payload) {
+ return PageUtils.getInt(pageAddr, payload.offset() + valueSizeOffsetInFirstSlot());
+ }
+
+ private long readFully(long pageAddr, DataPagePayload payload) {
+ allValueBytes = PageUtils.getBytes(pageAddr, payload.offset() + valueOffsetInFirstSlot(), valueSize);
+
+ return STOP_TRAVERSAL;
+ }
+
+ private void readValueFragmentToArray(long pageAddr, DataPagePayload payload, int offsetToValue) {
+ PageUtils.getBytes(pageAddr, payload.offset() + offsetToValue, allValueBytes, transferredBytes, payload.payloadSize());
+ transferredBytes += payload.payloadSize();
+ }
+
+ private long readNextFragment(long pageAddr, DataPagePayload payload) {
+ assert allValueBytes != null;
+
+ readValueFragmentToArray(pageAddr, payload, 0);
+
+ if (payload.hasMoreFragments()) {
+ return payload.nextLink();
+ } else {
+ return STOP_TRAVERSAL;
+ }
+ }
+
+ /**
+ * Returns a byte array containing all the data of the page memory row in question. Should only be called after
+ * the traversal has stopped.
+ *
+ * @return a byte array containing all the data of the page memory row in question
+ */
+ public byte[] result() {
+ return Objects.requireNonNull(allValueBytes);
+ }
+
+ /**
+ * Memory offset into first slot at which the 'value' size is stored (as int).
+ *
+ * @return offset into first slot at which the 'value' size is stored (as int)
+ */
+ protected abstract int valueSizeOffsetInFirstSlot();
+
+ /**
+ * Memory offset into first slot at which the 'value' starts.
+ *
+ * @return offset into first slot at which the 'value' starts
+ */
+ protected abstract int valueOffsetInFirstSlot();
+
+ /**
+ * Resets the object to make it ready for use.
+ */
+ public void reset() {
+ readingFirstSlot = true;
+ allValueBytes = null;
+ transferredBytes = 0;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java
index de6664222..5c84fa367 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java
@@ -47,7 +47,11 @@ public interface FreeList<T extends Storable> {
void insertDataRows(Collection<T> rows, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException;
/**
- * Updates a row by link.
+ * Makes an in-place update of a row identified by the link.
+ * This has a couple of restrictions:
+ * 1. The size of the payload must not change, otherwise the page will be broken (and next insertion will fail due
+ * to assertion failure).
+ * 2. The row cannot be fragmented. If it is, this will return {@code false} without doing anything.
*
* @param link Row link.
* @param row New row data.
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java
index 1b20779d5..c63fac03e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java
@@ -189,6 +189,11 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
/** Minimum page overhead in bytes. */
public static final int MIN_DATA_PAGE_OVERHEAD = ITEMS_OFF + ITEM_SIZE + PAYLOAD_LEN_SIZE + LINK_SIZE;
+ /** Special index value that signals that the given itemId does not exist. */
+ private static final int NON_EXISTENT_ITEM_INDEX = -1;
+ /** Special offset value that signals that the given itemId does not exist. */
+ private static final int NON_EXISTENT_ITEM_OFFSET = -1;
+
/**
* Constructor.
*
@@ -345,7 +350,7 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
// possibly a link to the next row fragment.
freeSpace -= ITEM_SIZE + PAYLOAD_LEN_SIZE + LINK_SIZE;
- return freeSpace < 0 ? 0 : freeSpace;
+ return Math.max(freeSpace, 0);
}
/**
@@ -483,6 +488,16 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
* @return Found index of indirect item.
*/
private int findIndirectItemIndex(long pageAddr, int itemId, int directCnt, int indirectCnt) {
+ int maybeOffset = findIndirectItemIndexOrNotFoundMarker(pageAddr, itemId, directCnt, indirectCnt);
+
+ if (maybeOffset == NON_EXISTENT_ITEM_INDEX) {
+ throw new IllegalStateException("Item not found: " + itemId);
+ }
+
+ return maybeOffset;
+ }
+
+ private int findIndirectItemIndexOrNotFoundMarker(long pageAddr, int itemId, int directCnt, int indirectCnt) {
int low = directCnt;
int high = directCnt + indirectCnt - 1;
@@ -500,7 +515,7 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
}
}
- throw new IllegalStateException("Item not found: " + itemId);
+ return NON_EXISTENT_ITEM_INDEX;
}
/**
@@ -611,6 +626,7 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
assert directCnt > 0 : "itemId=" + itemId + ", directCnt=" + directCnt + ", page=" + printPageLayout(pageAddr, pageSize);
+ final int directItemId;
if (itemId >= directCnt) { // Need to do indirect lookup.
int indirectCnt = getIndirectCount(pageAddr);
@@ -618,17 +634,65 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
assert indirectCnt > 0 : "itemId=" + itemId + ", directCnt=" + directCnt + ", indirectCnt=" + indirectCnt
+ ", page=" + printPageLayout(pageAddr, pageSize);
- int indirectItemIdx = findIndirectItemIndex(pageAddr, itemId, directCnt, indirectCnt);
+ directItemId = resolveDirectItemIdFromIndirectItemId(pageAddr, itemId, directCnt, indirectCnt);
+ } else {
+ directItemId = itemId;
+ }
+
+ return directItemToOffset(getItem(pageAddr, directItemId));
+ }
- assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt;
- assert indirectItemIdx < directCnt + indirectCnt : indirectItemIdx + " " + directCnt + " " + indirectCnt;
+ private int resolveDirectItemIdFromIndirectItemId(long pageAddr, int itemId, int directCnt, int indirectCnt) {
+ int indirectItemIdx = findIndirectItemIndex(pageAddr, itemId, directCnt, indirectCnt);
- itemId = directItemIndex(getItem(pageAddr, indirectItemIdx));
+ assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt;
+ assert indirectItemIdx < directCnt + indirectCnt : indirectItemIdx + " " + directCnt + " " + indirectCnt;
- assert itemId >= 0 && itemId < directCnt : itemId + " " + directCnt + " " + indirectCnt; // Direct item.
+ int directItemId = directItemIndex(getItem(pageAddr, indirectItemIdx));
+
+ assert directItemId >= 0 && directItemId < directCnt : directItemId + " " + directCnt + " " + indirectCnt; // Direct item.
+ return directItemId;
+ }
+
+ protected int getDataOffsetOrNotFoundMarker(long pageAddr, int itemId, int pageSize) {
+ assert checkIndex(itemId) : itemId;
+
+ int directCnt = getDirectCount(pageAddr);
+
+ if (directCnt <= 0) {
+ return NON_EXISTENT_ITEM_OFFSET;
}
- return directItemToOffset(getItem(pageAddr, itemId));
+ int indirectCnt = getIndirectCount(pageAddr);
+
+ final int directItemId;
+ if (itemId >= directCnt) { // Need to do indirect lookup.
+ if (indirectCnt <= 0) {
+ return NON_EXISTENT_ITEM_OFFSET;
+ }
+
+ int indirectItemIdx = findIndirectItemIndexOrNotFoundMarker(pageAddr, itemId, directCnt, indirectCnt);
+ if (indirectItemIdx == NON_EXISTENT_ITEM_INDEX) {
+ return NON_EXISTENT_ITEM_OFFSET;
+ }
+
+ directItemId = resolveDirectItemIdFromIndirectItemId(pageAddr, itemId, directCnt, indirectCnt);
+ } else {
+ // it is a direct item
+
+ if (anyIndirectItemPointsAtDirectItem(pageAddr, itemId, directCnt, indirectCnt)) {
+ return NON_EXISTENT_ITEM_OFFSET;
+ }
+
+ directItemId = itemId;
+ }
+
+ return directItemToOffset(getItem(pageAddr, directItemId));
+ }
+
+ private boolean anyIndirectItemPointsAtDirectItem(long pageAddr, int itemId, int directCnt, int indirectCnt) {
+ int maybeIndirectItemIdx = findIndirectItemIndexOrNotFoundMarker(pageAddr, itemId, directCnt, indirectCnt);
+ return maybeIndirectItemIdx != NON_EXISTENT_ITEM_INDEX;
}
/**
@@ -673,6 +737,18 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
nextLink);
}
+ /**
+ * Returns {@code true} iff an item with given ID exists in a page at the given address.
+ *
+ * @param pageAddr address where page data begins in memory
+ * @param itemId item ID to check
+ * @param pageSize size of the page in bytes
+ * @return {@code true} iff an item with given ID exists in a page at the given address
+ */
+ public boolean itemExists(long pageAddr, int itemId, final int pageSize) {
+ return getDataOffsetOrNotFoundMarker(pageAddr, itemId, pageSize) != NON_EXISTENT_ITEM_OFFSET;
+ }
+
/**
* Returns Offset to start of actual fragment data.
*
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java
index e43ce8bcf..b5b66dd2d 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java
@@ -67,6 +67,15 @@ public class DataPagePayload {
return nextLink;
}
+ /**
+ * Returns {@code true} if this payload links to next fragment.
+ *
+ * @return {@code true} if this payload links to next fragment
+ */
+ public boolean hasMoreFragments() {
+ return nextLink != 0;
+ }
+
/**
* Returns payload bytes.
*
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 0d6ab8c78..7352ccbc3 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -28,6 +28,8 @@ import java.nio.channels.WritableByteChannel;
* Heap byte buffer-based row.
*/
public class ByteBufferRow implements BinaryRow {
+ public static ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
/** Row buffer. */
private final ByteBuffer buf;
@@ -37,7 +39,7 @@ public class ByteBufferRow implements BinaryRow {
* @param data Array representation of the row.
*/
public ByteBufferRow(byte[] data) {
- this(ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN));
+ this(ByteBuffer.wrap(data).order(ORDER));
}
/**
@@ -46,7 +48,7 @@ public class ByteBufferRow implements BinaryRow {
* @param buf Buffer representing the row.
*/
public ByteBufferRow(ByteBuffer buf) {
- assert buf.order() == ByteOrder.LITTLE_ENDIAN;
+ assert buf.order() == ORDER;
assert buf.position() == 0;
this.buf = buf;
@@ -90,7 +92,7 @@ public class ByteBufferRow implements BinaryRow {
int limit = buf.limit();
try {
- return buf.limit(off + len).position(off).slice().order(ByteOrder.LITTLE_ENDIAN);
+ return buf.limit(off + len).position(off).slice().order(ORDER);
} finally {
buf.position(0); // Reset bounds.
buf.limit(limit);
@@ -105,7 +107,7 @@ public class ByteBufferRow implements BinaryRow {
int limit = buf.limit();
try {
- return buf.limit(off + len).position(off).slice().order(ByteOrder.LITTLE_ENDIAN);
+ return buf.limit(off + len).position(off).slice().order(ORDER);
} finally {
buf.position(0); // Reset bounds.
buf.limit(limit);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index a007c4c72..31c9c578f 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -26,6 +26,8 @@ import org.jetbrains.annotations.Nullable;
/**
* Multi-versioned partition storage.
+ *
+ * <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
/**
@@ -61,12 +63,17 @@ public interface MvPartitionStorage extends AutoCloseable {
RowId insert(BinaryRow binaryRow, UUID txId) throws StorageException;
/**
- * Creates an uncommitted version, assigned to the given transaction id.
+ * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id.
+ * In details:
+ * - if there is no uncommitted version, a new uncommitted version is added
+ * - if there is an uncommitted version belonging to the same transaction, it gets replaced by the given version
+ * - if there is an uncommitted version belonging to a different transaction, {@link TxIdMismatchException} is thrown
*
* @param rowId Row id.
* @param row Binary row to update. Key only row means value removal.
* @param txId Transaction id.
- * @return Previour row associated with the row id.
+ * @return Previous uncommitted row version associated with the row id, or {@code null} if no uncommitted version
+ * exists before this call
* @throws TxIdMismatchException If there's another pending update associated with different transaction id.
* @throws StorageException If failed to write data to the storage.
*/
@@ -76,7 +83,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* Aborts a pending update of the ongoing uncommitted transaction. Invoked during rollback.
*
* @param rowId Row id.
- * @return Previour row associated with the row id.
+ * @return Previous uncommitted row version associated with the row id.
* @throws StorageException If failed to write data to the storage.
*/
@Nullable BinaryRow abortWrite(RowId rowId) throws StorageException;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/NoUncommittedVersionException.java
similarity index 67%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/NoUncommittedVersionException.java
index dbc6e3c2f..33ceb3447 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/NoUncommittedVersionException.java
@@ -15,22 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.IgniteInternalException;
/**
- * Data region based on {@link PageMemory}.
+ * Thrown when an operation is invoked that requires an uncommitted version to exist, but no such version exists in reality.
*/
-public interface PageMemoryDataRegion {
- /**
- * Returns {@link true} if the date region is persistent.
- */
- boolean persistent();
-
- /**
- * Returns page memory, {@code null} if not started.
- */
- @Nullable
- PageMemory pageMemory();
+public class NoUncommittedVersionException extends IgniteInternalException {
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index dbb6afa7d..7418438a5 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.storage;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -31,246 +35,238 @@ import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
+import java.util.function.Predicate;
import java.util.stream.StreamSupport;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/**
* Base test for MV partition storages.
*/
-public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest {
- /**
- * Creates a storage instance for testing.
- */
- protected abstract MvPartitionStorage partitionStorage();
+public abstract class AbstractMvPartitionStorageTest<S extends MvPartitionStorage> extends BaseMvStoragesTest {
+ protected S storage;
+
+ protected final UUID txId = newTransactionId();
+
+ private final TestKey key = new TestKey(10, "foo");
+ private final TestValue value = new TestValue(20, "bar");
+ protected final BinaryRow binaryRow = binaryRow(key, value);
+ private final TestValue value2 = new TestValue(21, "bar2");
+ protected final BinaryRow binaryRow2 = binaryRow(key, value2);
/**
- * Tests that reads and scan from empty storage return empty results.
+ * Tests that reads from empty storage return empty results.
*/
@Test
- public void testEmpty() throws Exception {
- MvPartitionStorage pk = partitionStorage();
+ public void testReadsFromEmpty() {
+ RowId rowId = insertAndAbortWrite();
- RowId rowId = pk.insert(binaryRow(new TestKey(1, "1"), new TestValue(1, "1")), UUID.randomUUID());
+ assertEquals(partitionId(), rowId.partitionId());
- pk.abortWrite(rowId);
+ assertNull(storage.read(rowId, newTransactionId()));
+ assertNull(storage.read(rowId, Timestamp.nextVersion()));
+ }
- assertEquals(0, rowId.partitionId());
+ private RowId insertAndAbortWrite() {
+ RowId rowId = storage.insert(binaryRow, txId);
- // Read.
- assertNull(pk.read(rowId, UUID.randomUUID()));
- assertNull(pk.read(rowId, Timestamp.nextVersion()));
+ storage.abortWrite(rowId);
- // Scan.
- assertEquals(List.of(), convert(pk.scan(row -> true, UUID.randomUUID())));
- assertEquals(List.of(), convert(pk.scan(row -> true, Timestamp.nextVersion())));
+ return rowId;
}
- /**
- * Tests basic invariants of {@link MvPartitionStorage#addWrite(BinaryRow, UUID)}.
- */
@Test
- public void testAddWrite() throws Exception {
- MvPartitionStorage pk = partitionStorage();
+ public void testScanOverEmpty() throws Exception {
+ insertAndAbortWrite();
- TestKey key = new TestKey(10, "foo");
- TestValue value = new TestValue(20, "bar");
+ assertEquals(List.of(), convert(storage.scan(row -> true, newTransactionId())));
+ assertEquals(List.of(), convert(storage.scan(row -> true, Timestamp.nextVersion())));
+ }
- BinaryRow binaryRow = binaryRow(key, value);
+ protected int partitionId() {
+ return 0;
+ }
- UUID txId = UUID.randomUUID();
+ protected UUID newTransactionId() {
+ return UUID.randomUUID();
+ }
- RowId rowId = pk.insert(binaryRow, txId);
+ /**
+ * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID)}.
+ */
+ @Test
+ public void testAddWrite() {
+ RowId rowId = storage.insert(binaryRow, txId);
// Attempt to write from another transaction.
- assertThrows(TxIdMismatchException.class, () -> pk.addWrite(rowId, binaryRow, UUID.randomUUID()));
+ assertThrows(TxIdMismatchException.class, () -> storage.addWrite(rowId, binaryRow, newTransactionId()));
// Write from the same transaction.
- pk.addWrite(rowId, binaryRow, txId);
+ storage.addWrite(rowId, binaryRow, txId);
// Read without timestamp returns uncommited row.
- assertEquals(value, value(pk.read(rowId, txId)));
+ assertRowMatches(storage.read(rowId, txId), binaryRow);
// Read with wrong transaction id should throw exception.
- assertThrows(TxIdMismatchException.class, () -> pk.read(rowId, UUID.randomUUID()));
+ assertThrows(TxIdMismatchException.class, () -> storage.read(rowId, newTransactionId()));
// Read with timestamp returns null.
- assertNull(pk.read(rowId, Timestamp.nextVersion()));
+ assertNull(storage.read(rowId, Timestamp.nextVersion()));
}
/**
- * Tests basic invariants of {@link MvPartitionStorage#abortWrite(BinaryRow)}.
+ * Tests basic invariants of {@link MvPartitionStorage#abortWrite(RowId)}.
*/
@Test
- public void testAbortWrite() throws Exception {
- MvPartitionStorage pk = partitionStorage();
-
- TestKey key = new TestKey(10, "foo");
- TestValue value = new TestValue(20, "bar");
-
- UUID txId = UUID.randomUUID();
+ public void testAbortWrite() {
+ RowId rowId = storage.insert(binaryRow(key, value), txId);
- RowId rowId = pk.insert(binaryRow(key, value), txId);
-
- pk.abortWrite(rowId);
+ storage.abortWrite(rowId);
// Aborted row can't be read.
- assertNull(pk.read(rowId, txId));
+ assertNull(storage.read(rowId, txId));
}
/**
- * Tests basic invariants of {@link MvPartitionStorage#commitWrite(BinaryRow, Timestamp)}.
+ * Tests basic invariants of {@link MvPartitionStorage#commitWrite(RowId, Timestamp)}.
*/
@Test
- public void testCommitWrite() throws Exception {
- MvPartitionStorage pk = partitionStorage();
-
- TestKey key = new TestKey(10, "foo");
- TestValue value = new TestValue(20, "bar");
-
- BinaryRow binaryRow = binaryRow(key, value);
-
- UUID txId = UUID.randomUUID();
-
- RowId rowId = pk.insert(binaryRow, txId);
+ public void testCommitWrite() {
+ RowId rowId = storage.insert(binaryRow, txId);
Timestamp tsBefore = Timestamp.nextVersion();
Timestamp tsExact = Timestamp.nextVersion();
- pk.commitWrite(rowId, tsExact);
+ storage.commitWrite(rowId, tsExact);
Timestamp tsAfter = Timestamp.nextVersion();
// Row is invisible at the time before writing.
- assertNull(pk.read(rowId, tsBefore));
+ assertNull(storage.read(rowId, tsBefore));
// Row is valid at the time during and after writing.
- assertEquals(value, value(pk.read(rowId, txId)));
- assertEquals(value, value(pk.read(rowId, tsExact)));
- assertEquals(value, value(pk.read(rowId, tsAfter)));
+ assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+ assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
TestValue newValue = new TestValue(30, "duh");
- UUID newTxId = UUID.randomUUID();
+ UUID newTxId = newTransactionId();
- pk.addWrite(rowId, binaryRow(key, newValue), newTxId);
+ BinaryRow newRow = binaryRow(key, newValue);
+ storage.addWrite(rowId, newRow, newTxId);
// Same checks, but now there are two different versions.
- assertNull(pk.read(rowId, tsBefore));
+ assertNull(storage.read(rowId, tsBefore));
- assertEquals(newValue, value(pk.read(rowId, newTxId)));
+ assertRowMatches(storage.read(rowId, newTxId), newRow);
- assertEquals(value, value(pk.read(rowId, tsExact)));
- assertEquals(value, value(pk.read(rowId, tsAfter)));
- assertEquals(value, value(pk.read(rowId, Timestamp.nextVersion())));
+ assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+ assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
+ assertRowMatches(storage.read(rowId, Timestamp.nextVersion()), binaryRow);
// Only latest time behavior changes after commit.
- pk.commitWrite(rowId, Timestamp.nextVersion());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
- assertEquals(newValue, value(pk.read(rowId, newTxId)));
+ assertRowMatches(storage.read(rowId, newTxId), newRow);
- assertEquals(value, value(pk.read(rowId, tsExact)));
- assertEquals(value, value(pk.read(rowId, tsAfter)));
+ assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+ assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
- assertEquals(newValue, value(pk.read(rowId, Timestamp.nextVersion())));
+ assertRowMatches(storage.read(rowId, Timestamp.nextVersion()), newRow);
// Remove.
- UUID removeTxId = UUID.randomUUID();
+ UUID removeTxId = newTransactionId();
- pk.addWrite(rowId, null, removeTxId);
+ storage.addWrite(rowId, null, removeTxId);
- assertNull(pk.read(rowId, tsBefore));
+ assertNull(storage.read(rowId, tsBefore));
- assertNull(pk.read(rowId, removeTxId));
+ assertNull(storage.read(rowId, removeTxId));
- assertEquals(value, value(pk.read(rowId, tsExact)));
- assertEquals(value, value(pk.read(rowId, tsAfter)));
+ assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+ assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
- assertEquals(newValue, value(pk.read(rowId, Timestamp.nextVersion())));
+ assertRowMatches(storage.read(rowId, Timestamp.nextVersion()), newRow);
// Commit remove.
Timestamp removeTs = Timestamp.nextVersion();
- pk.commitWrite(rowId, removeTs);
+ storage.commitWrite(rowId, removeTs);
- assertNull(pk.read(rowId, tsBefore));
+ assertNull(storage.read(rowId, tsBefore));
- assertNull(pk.read(rowId, removeTxId));
- assertNull(pk.read(rowId, removeTs));
- assertNull(pk.read(rowId, Timestamp.nextVersion()));
+ assertNull(storage.read(rowId, removeTxId));
+ assertNull(storage.read(rowId, removeTs));
+ assertNull(storage.read(rowId, Timestamp.nextVersion()));
- assertEquals(value, value(pk.read(rowId, tsExact)));
- assertEquals(value, value(pk.read(rowId, tsAfter)));
+ assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+ assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
}
/**
- * Tests basic invariants of {@link MvPartitionStorage#scan(Timestamp)}.
+ * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate, Timestamp)}.
*/
@Test
public void testScan() throws Exception {
- MvPartitionStorage pk = partitionStorage();
-
TestKey key1 = new TestKey(1, "1");
TestValue value1 = new TestValue(10, "xxx");
TestKey key2 = new TestKey(2, "2");
TestValue value2 = new TestValue(20, "yyy");
- UUID txId1 = UUID.randomUUID();
- RowId rowId1 = pk.insert(binaryRow(key1, value1), txId1);
+ UUID txId1 = newTransactionId();
+ RowId rowId1 = storage.insert(binaryRow(key1, value1), txId1);
- UUID txId2 = UUID.randomUUID();
- RowId rowId2 = pk.insert(binaryRow(key2, value2), txId2);
+ UUID txId2 = newTransactionId();
+ RowId rowId2 = storage.insert(binaryRow(key2, value2), txId2);
// Scan with and without filters.
- assertThrows(TxIdMismatchException.class, () -> convert(pk.scan(row -> true, txId1)));
- assertThrows(TxIdMismatchException.class, () -> convert(pk.scan(row -> true, txId2)));
+ assertThrows(TxIdMismatchException.class, () -> convert(storage.scan(row -> true, txId1)));
+ assertThrows(TxIdMismatchException.class, () -> convert(storage.scan(row -> true, txId2)));
- assertEquals(List.of(value1), convert(pk.scan(row -> key(row).intKey == 1, txId1)));
- assertEquals(List.of(value2), convert(pk.scan(row -> key(row).intKey == 2, txId2)));
+ assertEquals(List.of(value1), convert(storage.scan(row -> key(row).intKey == 1, txId1)));
+ assertEquals(List.of(value2), convert(storage.scan(row -> key(row).intKey == 2, txId2)));
Timestamp ts1 = Timestamp.nextVersion();
Timestamp ts2 = Timestamp.nextVersion();
- pk.commitWrite(rowId1, ts2);
+ storage.commitWrite(rowId1, ts2);
Timestamp ts3 = Timestamp.nextVersion();
Timestamp ts4 = Timestamp.nextVersion();
- pk.commitWrite(rowId2, ts4);
+ storage.commitWrite(rowId2, ts4);
Timestamp ts5 = Timestamp.nextVersion();
// Full scan with various timestamp values.
- assertEquals(List.of(), convert(pk.scan(row -> true, ts1)));
+ assertEquals(List.of(), convert(storage.scan(row -> true, ts1)));
- assertEquals(List.of(value1), convert(pk.scan(row -> true, ts2)));
- assertEquals(List.of(value1), convert(pk.scan(row -> true, ts3)));
+ assertEquals(List.of(value1), convert(storage.scan(row -> true, ts2)));
+ assertEquals(List.of(value1), convert(storage.scan(row -> true, ts3)));
- assertEquals(List.of(value1, value2), convert(pk.scan(row -> true, ts4)));
- assertEquals(List.of(value1, value2), convert(pk.scan(row -> true, ts5)));
+ assertEquals(List.of(value1, value2), convert(storage.scan(row -> true, ts4)));
+ assertEquals(List.of(value1, value2), convert(storage.scan(row -> true, ts5)));
}
@Test
public void testScanCursorInvariants() {
- MvPartitionStorage pk = partitionStorage();
-
TestValue value1 = new TestValue(10, "xxx");
TestValue value2 = new TestValue(20, "yyy");
- UUID txId = UUID.randomUUID();
-
- RowId rowId1 = pk.insert(binaryRow(new TestKey(1, "1"), value1), txId);
- pk.commitWrite(rowId1, Timestamp.nextVersion());
+ RowId rowId1 = storage.insert(binaryRow(new TestKey(1, "1"), value1), txId);
+ storage.commitWrite(rowId1, Timestamp.nextVersion());
- RowId rowId2 = pk.insert(binaryRow(new TestKey(2, "2"), value2), txId);
- pk.commitWrite(rowId2, Timestamp.nextVersion());
+ RowId rowId2 = storage.insert(binaryRow(new TestKey(2, "2"), value2), txId);
+ storage.commitWrite(rowId2, Timestamp.nextVersion());
- Cursor<BinaryRow> cursor = pk.scan(row -> true, txId);
+ Cursor<BinaryRow> cursor = storage.scan(row -> true, txId);
assertTrue(cursor.hasNext());
+ //noinspection ConstantConditions
assertTrue(cursor.hasNext());
List<TestValue> res = new ArrayList<>();
@@ -278,11 +274,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
res.add(value(cursor.next()));
assertTrue(cursor.hasNext());
+ //noinspection ConstantConditions
assertTrue(cursor.hasNext());
res.add(value(cursor.next()));
assertFalse(cursor.hasNext());
+ //noinspection ConstantConditions
assertFalse(cursor.hasNext());
assertThrows(NoSuchElementException.class, () -> cursor.next());
@@ -298,4 +296,320 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
.collect(toList());
}
}
+
+ @Test
+ void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ BinaryRow foundRow = storage.read(rowId, txId);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ private void assertRowMatches(@Nullable BinaryRow rowUnderQuestion, BinaryRow expectedRow) {
+ assertThat(rowUnderQuestion, is(notNullValue()));
+ assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
+ }
+
+ @Test
+ void readOfUncommittedRowWithDifferentTransactionIdThrows() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ assertThrows(TxIdMismatchException.class, () -> storage.read(rowId, newTransactionId()));
+ }
+
+ @Test
+ void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
+ RowId rowId = storage.insert(binaryRow, txId);
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow foundRow = storage.read(rowId, newTransactionId());
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void readsUncommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
+ RowId rowId1 = storage.insert(binaryRow, txId);
+ storage.commitWrite(rowId1, Timestamp.nextVersion());
+
+ RowId rowId2 = storage.insert(binaryRow2, txId);
+
+ BinaryRow foundRow = storage.read(rowId2, txId);
+
+ assertRowMatches(foundRow, binaryRow2);
+ }
+
+ @Test
+ void readsCommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
+ RowId rowId1 = storage.insert(binaryRow, txId);
+ storage.commitWrite(rowId1, Timestamp.nextVersion());
+
+ RowId rowId2 = storage.insert(binaryRow2, txId);
+ storage.commitWrite(rowId2, Timestamp.nextVersion());
+
+ BinaryRow foundRow = storage.read(rowId2, txId);
+
+ assertRowMatches(foundRow, binaryRow2);
+ }
+
+ @Test
+ void readByExactlyCommitTimestampFindsRow() {
+ RowId rowId = storage.insert(binaryRow, txId);
+ Timestamp commitTimestamp = Timestamp.nextVersion();
+ storage.commitWrite(rowId, commitTimestamp);
+
+ BinaryRow foundRow = storage.read(rowId, commitTimestamp);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void readByTimestampAfterCommitTimestampFindsRow() {
+ RowId rowId = storage.insert(binaryRow, txId);
+ Timestamp commitTimestamp = Timestamp.nextVersion();
+ storage.commitWrite(rowId, commitTimestamp);
+
+ Timestamp afterCommit = Timestamp.nextVersion();
+ BinaryRow foundRow = storage.read(rowId, afterCommit);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void readByTimestampBeforeFirstVersionCommitTimestampFindsNothing() {
+ Timestamp beforeCommit = Timestamp.nextVersion();
+
+ RowId rowId = storage.insert(binaryRow, txId);
+ Timestamp commitTimestamp = Timestamp.nextVersion();
+ storage.commitWrite(rowId, commitTimestamp);
+
+ BinaryRow foundRow = storage.read(rowId, beforeCommit);
+
+ assertThat(foundRow, is(nullValue()));
+ }
+
+ @Test
+ void readByTimestampOfLastVersionFindsLastVersion() {
+ RowId rowId = storage.insert(binaryRow, txId);
+ Timestamp firstVersionTs = Timestamp.nextVersion();
+ storage.commitWrite(rowId, firstVersionTs);
+
+ storage.addWrite(rowId, binaryRow2, newTransactionId());
+ Timestamp secondVersionTs = Timestamp.nextVersion();
+ storage.commitWrite(rowId, secondVersionTs);
+
+ BinaryRow foundRow = storage.read(rowId, secondVersionTs);
+
+ assertRowMatches(foundRow, binaryRow2);
+ }
+
+ @Test
+ void readByTimestampOfPreviousVersionFindsPreviousVersion() {
+ RowId rowId = storage.insert(binaryRow, txId);
+ Timestamp firstVersionTs = Timestamp.nextVersion();
+ storage.commitWrite(rowId, firstVersionTs);
+
+ storage.addWrite(rowId, binaryRow2, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow foundRow = storage.read(rowId, firstVersionTs);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void readByTimestampBetweenVersionsFindsPreviousVersion() {
+ RowId rowId = storage.insert(binaryRow, txId);
+ Timestamp firstVersionTs = Timestamp.nextVersion();
+ storage.commitWrite(rowId, firstVersionTs);
+
+ Timestamp tsInBetween = Timestamp.nextVersion();
+
+ storage.addWrite(rowId, binaryRow2, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow foundRow = storage.read(rowId, tsInBetween);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void readByTimestampIgnoresUncommittedVersion() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ storage.addWrite(rowId, binaryRow2, newTransactionId());
+
+ Timestamp latestTs = Timestamp.nextVersion();
+ BinaryRow foundRow = storage.read(rowId, latestTs);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void addWriteWithDifferentTxIdThrows() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ assertThrows(TxIdMismatchException.class, () -> storage.addWrite(rowId, binaryRow2, newTransactionId()));
+ }
+
+ @Test
+ void secondUncommittedWriteWithSameTxIdReplacesExistingUncommittedWrite() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ storage.addWrite(rowId, binaryRow2, txId);
+
+ BinaryRow foundRow = storage.read(rowId, txId);
+
+ assertRowMatches(foundRow, binaryRow2);
+ }
+
+ @Test
+ void addWriteReturnsUncommittedVersionIfItExists() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ BinaryRow returnedRow = storage.addWrite(rowId, binaryRow2, txId);
+
+ assertRowMatches(returnedRow, binaryRow);
+ }
+
+ @Test
+ void addWriteReturnsNullIfNoUncommittedVersionExists() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow returnedRow = storage.addWrite(rowId, binaryRow2, txId);
+
+ assertThat(returnedRow, is(nullValue()));
+ }
+
+ @Test
+ void afterRemovalReadWithTxIdFindsNothing() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ storage.addWrite(rowId, null, txId);
+
+ BinaryRow foundRow = storage.read(rowId, txId);
+
+ assertThat(foundRow, is(nullValue()));
+ }
+
+ @Test
+ void afterRemovalReadByLatestTimestampFindsNothing() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ storage.addWrite(rowId, null, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+
+ assertThat(foundRow, is(nullValue()));
+ }
+
+ @Test
+ void afterRemovalPreviousVersionRemainsAccessibleByTimestamp() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ Timestamp firstTimestamp = Timestamp.nextVersion();
+ storage.commitWrite(rowId, firstTimestamp);
+
+ storage.addWrite(rowId, null, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow foundRow = storage.read(rowId, firstTimestamp);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void removalReturnsUncommittedRowVersionIfItExists() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ BinaryRow rowFromRemoval = storage.addWrite(rowId, null, txId);
+
+ assertRowMatches(rowFromRemoval, binaryRow);
+ }
+
+ @Test
+ void removalReturnsNullIfNoUncommittedVersionExists() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow rowFromRemoval = storage.addWrite(rowId, null, newTransactionId());
+
+ assertThat(rowFromRemoval, is(nullValue()));
+ }
+
+ @Test
+ void commitWriteMakesVersionAvailableToReadByTimestamp() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void abortWriteFailsIfNoUncommittedVersionExists() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ assertThrows(NoUncommittedVersionException.class, () -> storage.abortWrite(rowId));
+ }
+
+ @Test
+ void abortWriteRemovesUncommittedVersion() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.commitWrite(rowId, Timestamp.nextVersion());
+
+ storage.addWrite(rowId, binaryRow2, txId);
+
+ storage.abortWrite(rowId);
+
+ BinaryRow foundRow = storage.read(rowId, txId);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void abortOfInsertMakesRowNonExistentForReadByTimestamp() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+
+ storage.abortWrite(rowId);
+
+ BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+
+ assertThat(foundRow, is(nullValue()));
+ }
+
+ @Test
+ void abortOfInsertMakesRowNonExistentForReadWithTxId() {
+ RowId rowId = insertAndAbortWrite();
+
+ BinaryRow foundRow = storage.read(rowId, txId);
+
+ assertThat(foundRow, is(nullValue()));
+ }
+
+ @Test
+ void abortWriteReturnsTheRemovedVersion() {
+ RowId rowId = storage.insert(binaryRow, txId);
+
+ BinaryRow returnedRow = storage.abortWrite(rowId);
+
+ assertRowMatches(returnedRow, binaryRow);
+ }
+
+ @Test
+ void scanWithTxIdThrowsWhenOtherTransactionHasUncommittedChanges() {
+ storage.insert(binaryRow, txId);
+
+ Cursor<BinaryRow> cursor = storage.scan(k -> true, newTransactionId());
+
+ assertThrows(TxIdMismatchException.class, cursor::next);
+ }
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 57b5fd8c3..461c85687 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -86,7 +86,6 @@ public abstract class BaseMvStoragesTest {
}
}
- @Nullable
protected static TestKey key(BinaryRow binaryRow) {
try {
return kvMarshaller.unmarshalKey(new Row(schemaDescriptor, binaryRow));
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
index d66a2c261..933886d01 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.NoUncommittedVersionException;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -116,8 +117,11 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
BinaryRow[] res = {null};
map.computeIfPresent(rowId, (ignored, versionChain) -> {
- assert versionChain != null;
- assert versionChain.begin == null && versionChain.txId != null;
+ if (versionChain.txId == null) {
+ throw new NoUncommittedVersionException();
+ }
+
+ assert versionChain.begin == null;
cleanupIndexesForAbortedRow(versionChain, rowId);
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
index 86fc7825f..ba32f5d24 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
@@ -19,18 +19,16 @@ package org.apache.ignite.internal.storage.basic;
import java.util.List;
import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
/**
* MV partition storage test implementation for {@link TestMvPartitionStorage} class.
*/
-public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest {
- /** Test partition storage instance. */
- private final TestMvPartitionStorage storage = new TestMvPartitionStorage(List.of(), 0);
-
- /** {@inheritDoc} */
- @Override
- protected MvPartitionStorage partitionStorage() {
- return storage;
+public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest<TestMvPartitionStorage> {
+ /**
+ * Creates new instance.
+ */
+ public TestMvPartitionStorageTest() {
+ storage = new TestMvPartitionStorage(List.of(), 0);
}
+
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
index 8d535ac84..4e5863076 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.storage.pagememory;
+import java.util.Objects;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.jetbrains.annotations.Nullable;
/**
* Abstract data region for {@link PageMemoryStorageEngine}. Based on a {@link PageMemory}.
@@ -68,7 +68,7 @@ abstract class AbstractPageMemoryDataRegion implements PageMemoryDataRegion, Ign
/**
* Returns page memory, {@code null} if not {@link #start started}.
*/
- public @Nullable PageMemory pageMemory() {
- return pageMemory;
+ public PageMemory pageMemory() {
+ return Objects.requireNonNull(pageMemory);
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
index 746761a12..3270b5941 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageView;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -98,7 +97,7 @@ public class PageMemoryStorageEngine implements StorageEngine {
/** {@inheritDoc} */
@Override
- public TableStorage createTable(TableConfiguration tableCfg) {
+ public PageMemoryTableStorage createTable(TableConfiguration tableCfg) {
TableView tableView = tableCfg.value();
assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
index a109c5524..80d1489da 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
@@ -29,16 +29,18 @@ import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.pagememory.mv.PageMemoryMvPartitionStorage;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Abstract table storage implementation based on {@link PageMemory}.
*/
// TODO: IGNITE-16641 Add support for persistent case.
// TODO: IGNITE-16642 Support indexes.
-abstract class PageMemoryTableStorage implements TableStorage {
+public abstract class PageMemoryTableStorage implements TableStorage {
protected final AbstractPageMemoryDataRegion dataRegion;
protected final TableConfiguration tableCfg;
@@ -178,4 +180,17 @@ abstract class PageMemoryTableStorage implements TableStorage {
* @throws StorageException If there is an error while creating the partition storage.
*/
protected abstract PageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException;
+
+ /**
+ * This API is not yet ready. But we need to test mv storages anyways.
+ */
+ @TestOnly
+ public PageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) {
+ return new PageMemoryMvPartitionStorage(partitionId,
+ tableCfg.value(),
+ dataRegion,
+ ((VolatilePageMemoryDataRegion) dataRegion).versionChainFreeList(),
+ ((VolatilePageMemoryDataRegion) dataRegion).rowVersionFreeList()
+ );
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
index 11303caf8..cdcab829a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
@@ -154,7 +154,7 @@ public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
- if (data.nextLink() == 0 && nextLink == link) {
+ if (!data.hasMoreFragments() && nextLink == link) {
// Good luck: we can read the row without fragments.
return readFullRow(link, hash, rowData, pageAddr + data.offset());
}
@@ -251,7 +251,7 @@ public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
- if (data.nextLink() == 0 && nextLink == link) {
+ if (!data.hasMoreFragments() && nextLink == link) {
// Good luck: we can compare the rows without fragments.
return compareRowsFull(pageAddr + data.offset(), row);
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
index 36af324e7..6e609357c 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -26,15 +26,23 @@ import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.pagememory.mv.RowVersionFreeList;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainFreeList;
import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
* Implementation of {@link AbstractPageMemoryDataRegion} for in-memory case.
*/
-class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
- private TableFreeList freeList;
+public class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
+ private static final int FREE_LIST_GROUP_ID = 0;
+
+ private TableFreeList tableFreeList;
+
+ private VersionChainFreeList versionChainFreeList;
+ private RowVersionFreeList rowVersionFreeList;
/**
* Constructor.
@@ -68,23 +76,71 @@ class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
this.pageMemory = pageMemory;
try {
- int grpId = 0;
-
- long metaPageId = pageMemory.allocatePage(grpId, INDEX_PARTITION, FLAG_AUX);
-
- this.freeList = new TableFreeList(
- grpId,
- pageMemory,
- PageLockListenerNoOp.INSTANCE,
- metaPageId,
- true,
- null,
- PageEvictionTrackerNoOp.INSTANCE,
- IoStatisticsHolderNoOp.INSTANCE
- );
+ this.tableFreeList = createTableFreeList(pageMemory);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error creating a TableFreeList", e);
+ }
+
+ try {
+ versionChainFreeList = createVersionChainFreeList(pageMemory, null);
} catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error creating a freeList", e);
+ throw new StorageException("Error creating a VersionChainFreeList", e);
}
+
+ try {
+ rowVersionFreeList = createRowVersionFreeList(pageMemory, tableFreeList);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error creating a RowVersionFreeList", e);
+ }
+ }
+
+ private TableFreeList createTableFreeList(PageMemory pageMemory) throws IgniteInternalCheckedException {
+ long metaPageId = pageMemory.allocatePage(VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID, INDEX_PARTITION, FLAG_AUX);
+
+ return new TableFreeList(
+ VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID,
+ pageMemory,
+ PageLockListenerNoOp.INSTANCE,
+ metaPageId,
+ true,
+ null,
+ PageEvictionTrackerNoOp.INSTANCE,
+ IoStatisticsHolderNoOp.INSTANCE
+ );
+ }
+
+ private static VersionChainFreeList createVersionChainFreeList(PageMemory pageMemory, ReuseList reuseList)
+ throws IgniteInternalCheckedException {
+ long metaPageId = pageMemory.allocatePage(VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID, INDEX_PARTITION, FLAG_AUX);
+
+ return new VersionChainFreeList(
+ VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID,
+ pageMemory,
+ reuseList,
+ PageLockListenerNoOp.INSTANCE,
+ metaPageId,
+ true,
+ null,
+ PageEvictionTrackerNoOp.INSTANCE,
+ IoStatisticsHolderNoOp.INSTANCE
+ );
+ }
+
+ private static RowVersionFreeList createRowVersionFreeList(PageMemory pageMemory, ReuseList reuseList)
+ throws IgniteInternalCheckedException {
+ long metaPageId = pageMemory.allocatePage(VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID, INDEX_PARTITION, FLAG_AUX);
+
+ return new RowVersionFreeList(
+ VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID,
+ pageMemory,
+ reuseList,
+ PageLockListenerNoOp.INSTANCE,
+ metaPageId,
+ true,
+ null,
+ PageEvictionTrackerNoOp.INSTANCE,
+ IoStatisticsHolderNoOp.INSTANCE
+ );
}
/** {@inheritDoc} */
@@ -92,17 +148,31 @@ class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
public void stop() {
super.stop();
- if (freeList != null) {
- freeList.close();
+ if (tableFreeList != null) {
+ tableFreeList.close();
}
}
/**
- * Returns free list.
+ * Returns table free list.
*
* <p>NOTE: Free list must be one for the in-memory data region.
*/
- public TableFreeList freeList() {
- return freeList;
+ public TableFreeList tableFreeList() {
+ return tableFreeList;
+ }
+
+ /**
+ * Returns version chain free list.
+ */
+ public VersionChainFreeList versionChainFreeList() {
+ return versionChainFreeList;
+ }
+
+ /**
+ * Returns version chain free list.
+ */
+ public RowVersionFreeList rowVersionFreeList() {
+ return rowVersionFreeList;
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 8eaf09d32..fab23050a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -37,6 +37,6 @@ class VolatilePageMemoryTableStorage extends PageMemoryTableStorage {
/** {@inheritDoc} */
@Override
protected PageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException {
- return new PageMemoryPartitionStorage(partId, tableCfg, dataRegion, ((VolatilePageMemoryDataRegion) dataRegion).freeList());
+ return new PageMemoryPartitionStorage(partId, tableCfg, dataRegion, ((VolatilePageMemoryDataRegion) dataRegion).tableFreeList());
}
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LinkRowId.java
similarity index 54%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LinkRowId.java
index dbc6e3c2f..415bedbb2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LinkRowId.java
@@ -15,22 +15,31 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.storage.RowId;
/**
- * Data region based on {@link PageMemory}.
+ * {@link RowId} implementation which identifies the row data using row link in page-memory.
+ *
+ * @see RowId
+ * @see PageIdUtils#link(long, int)
*/
-public interface PageMemoryDataRegion {
- /**
- * Returns {@link true} if the date region is persistent.
- */
- boolean persistent();
+public class LinkRowId implements RowId {
+ private final long rowLink;
+
+ public LinkRowId(long rowLink) {
+ this.rowLink = rowLink;
+ }
+
+ @Override
+ public int partitionId() {
+ long pageId = PageIdUtils.pageId(rowLink);
+ return PageIdUtils.partitionId(pageId);
+ }
- /**
- * Returns page memory, {@code null} if not started.
- */
- @Nullable
- PageMemory pageMemory();
+ long versionChainLink() {
+ return rowLink;
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java
new file mode 100644
index 000000000..32bfa270d
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.NoUncommittedVersionException;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link MvPartitionStorage} using Page Memory.
+ *
+ * @see MvPartitionStorage
+ */
+public class PageMemoryMvPartitionStorage implements MvPartitionStorage {
+ private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
+
+ private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
+
+ private static final Predicate<Timestamp> ALWAYS_LOAD_VALUE = timestamp -> true;
+ private static final Predicate<Timestamp> LOAD_VALUE_WHEN_UNCOMMITTED = RowVersion::isUncommitted;
+
+ private final int partitionId;
+ private final int groupId;
+
+ private final VersionChainFreeList versionChainFreeList;
+ private final VersionChainTree versionChainTree;
+ private final VersionChainDataPageReader versionChainDataPageReader;
+ private final RowVersionFreeList rowVersionFreeList;
+ private final DataPageReader rowVersionDataPageReader;
+
+ private final ThreadLocal<ReadLatestRowVersion> readLatestRowVersionCache = ThreadLocal.withInitial(ReadLatestRowVersion::new);
+ private final ThreadLocal<ScanVersionChainByTimestamp> scanVersionChainByTimestampCache = ThreadLocal.withInitial(
+ ScanVersionChainByTimestamp::new
+ );
+
+ /**
+ * Constructor.
+ */
+ public PageMemoryMvPartitionStorage(
+ int partitionId,
+ TableView tableConfig,
+ PageMemoryDataRegion dataRegion,
+ VersionChainFreeList versionChainFreeList,
+ RowVersionFreeList rowVersionFreeList
+ ) {
+ this.partitionId = partitionId;
+
+ this.versionChainFreeList = versionChainFreeList;
+ this.rowVersionFreeList = rowVersionFreeList;
+
+ groupId = StorageUtils.groupId(tableConfig);
+
+ try {
+ versionChainTree = createVersionChainTree(partitionId, tableConfig, dataRegion, versionChainFreeList);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error occurred while creating the partition storage", e);
+ }
+
+ versionChainDataPageReader = new VersionChainDataPageReader(dataRegion.pageMemory(), groupId, IoStatisticsHolderNoOp.INSTANCE);
+ rowVersionDataPageReader = new DataPageReader(dataRegion.pageMemory(), groupId, IoStatisticsHolderNoOp.INSTANCE);
+ }
+
+ private VersionChainTree createVersionChainTree(
+ int partitionId,
+ TableView tableConfig,
+ PageMemoryDataRegion dataRegion,
+ VersionChainFreeList versionChainFreeList1
+ ) throws IgniteInternalCheckedException {
+ // TODO: IGNITE-16641 It is necessary to do getting the tree root for the persistent case.
+ long metaPageId = dataRegion.pageMemory().allocatePage(groupId, partitionId, FLAG_AUX);
+
+ // TODO: IGNITE-16641 It is necessary to take into account the persistent case.
+ boolean initNew = true;
+
+ return new VersionChainTree(
+ groupId,
+ tableConfig.name(),
+ dataRegion.pageMemory(),
+ PageLockListenerNoOp.INSTANCE,
+ new AtomicLong(),
+ metaPageId,
+ versionChainFreeList1,
+ partitionId,
+ initNew
+ );
+ }
+
+ @Override
+ public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+ VersionChain versionChain = findVersionChain(rowId);
+ if (versionChain == null) {
+ return null;
+ }
+
+ return findLatestRowVersion(versionChain, txId, MATCH_ALL);
+ }
+
+ @Override
+ public @Nullable BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException {
+ VersionChain versionChain = findVersionChain(rowId);
+ if (versionChain == null) {
+ return null;
+ }
+
+ return findRowVersionByTimestamp(versionChain, timestamp);
+ }
+
+ @Nullable
+ private VersionChain findVersionChain(RowId rowId) {
+ try {
+ return versionChainDataPageReader.getRowByLink(versionChainLinkFrom(rowId));
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Version chain lookup failed", e);
+ }
+ }
+
+ private long versionChainLinkFrom(RowId rowId) {
+ if (rowId.partitionId() != partitionId) {
+ throw new IllegalArgumentException("I own partition " + partitionId + " but I was given RowId with partition "
+ + rowId.partitionId());
+ }
+
+ LinkRowId linkRowId = (LinkRowId) rowId;
+
+ return linkRowId.versionChainLink();
+ }
+
+ @Nullable
+ private ByteBufferRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+ RowVersion rowVersion = findLatestRowVersion(versionChain, ALWAYS_LOAD_VALUE);
+ ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
+
+ if (!keyFilter.test(row)) {
+ return null;
+ }
+
+ throwIfChainBelongsToAnotherTx(versionChain, txId);
+
+ return row;
+ }
+
+ private RowVersion findLatestRowVersion(VersionChain versionChain, Predicate<Timestamp> loadValue) {
+ long nextLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(versionChain.headLink(), partitionId);
+
+ ReadLatestRowVersion read = freshReadLatestRowVersion();
+
+ try {
+ rowVersionDataPageReader.traverse(nextLink, read, loadValue);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Row version lookup failed");
+ }
+
+ return read.result();
+ }
+
+ private ReadLatestRowVersion freshReadLatestRowVersion() {
+ ReadLatestRowVersion traversal = readLatestRowVersionCache.get();
+ traversal.reset();
+ return traversal;
+ }
+
+ private void throwIfChainBelongsToAnotherTx(VersionChain versionChain, UUID txId) {
+ if (versionChain.transactionId() != null && !txId.equals(versionChain.transactionId())) {
+ throw new TxIdMismatchException();
+ }
+ }
+
+ @Nullable
+ private ByteBufferRow rowVersionToBinaryRow(RowVersion rowVersion) {
+ if (rowVersion.isTombstone()) {
+ return null;
+ }
+
+ return new ByteBufferRow(rowVersion.value());
+ }
+
+ @Nullable
+ private ByteBufferRow findRowVersionInChain(
+ VersionChain versionChain,
+ @Nullable UUID transactionId,
+ @Nullable Timestamp timestamp,
+ Predicate<BinaryRow> keyFilter
+ ) {
+ assert transactionId != null ^ timestamp != null;
+
+ if (transactionId != null) {
+ return findLatestRowVersion(versionChain, transactionId, keyFilter);
+ } else {
+ ByteBufferRow row = findRowVersionByTimestamp(versionChain, timestamp);
+ return keyFilter.test(row) ? row : null;
+ }
+ }
+
+ @Nullable
+ private ByteBufferRow findRowVersionByTimestamp(VersionChain versionChain, Timestamp timestamp) {
+ long nextRowPartitionlessLink = versionChain.headLink();
+ long nextLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(nextRowPartitionlessLink, partitionId);
+
+ ScanVersionChainByTimestamp scanByTimestamp = freshScanByTimestamp();
+
+ try {
+ rowVersionDataPageReader.traverse(nextLink, scanByTimestamp, timestamp);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot search for a row version", e);
+ }
+
+ return scanByTimestamp.result();
+ }
+
+ private ScanVersionChainByTimestamp freshScanByTimestamp() {
+ ScanVersionChainByTimestamp traversal = scanVersionChainByTimestampCache.get();
+ traversal.reset();
+ return traversal;
+ }
+
+ @Override
+ public LinkRowId insert(BinaryRow row, UUID txId) throws StorageException {
+ RowVersion rowVersion = insertRowVersion(Objects.requireNonNull(row), RowVersion.NULL_LINK);
+
+ VersionChain versionChain = new VersionChain(partitionId, txId, PartitionlessLinks.removePartitionIdFromLink(rowVersion.link()));
+
+ try {
+ versionChainFreeList.insertDataRow(versionChain);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot store a version chain", e);
+ }
+
+ try {
+ versionChainTree.putx(versionChain);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot put a version chain to the tree", e);
+ }
+
+ return new LinkRowId(versionChain.link());
+ }
+
+ private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
+ // TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+ byte[] rowBytes = row == null ? TOMBSTONE_PAYLOAD : row.bytes();
+
+ RowVersion rowVersion = new RowVersion(partitionId, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+ insertRowVersion(rowVersion);
+
+ return rowVersion;
+ }
+
+ private void insertRowVersion(RowVersion rowVersion) {
+ try {
+ rowVersionFreeList.insertDataRow(rowVersion);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot store a row version", e);
+ }
+ }
+
+ @Override
+ public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException, StorageException {
+ VersionChain currentChain = findVersionChainForModification(rowId);
+
+ throwIfChainBelongsToAnotherTx(currentChain, txId);
+
+ RowVersion currentVersion = findLatestRowVersion(currentChain, LOAD_VALUE_WHEN_UNCOMMITTED);
+ RowVersion newVersion = insertRowVersion(row, currentVersion.isUncommitted() ? currentVersion.nextLink() : currentChain.headLink());
+
+ if (currentVersion.isUncommitted()) {
+ // as we replace an uncommitted version with new one, we need to remove old uncommitted version
+ removeRowVersion(currentVersion);
+ }
+
+ VersionChain chainReplacement = new VersionChain(
+ partitionId,
+ txId,
+ PartitionlessLinks.removePartitionIdFromLink(newVersion.link())
+ );
+
+ updateVersionChain(currentChain, chainReplacement);
+
+ if (currentVersion.isUncommitted()) {
+ return rowVersionToBinaryRow(currentVersion);
+ } else {
+ return null;
+ }
+ }
+
+ @NotNull
+ private VersionChain findVersionChainForModification(RowId rowId) {
+ VersionChain currentChain = findVersionChain(rowId);
+ if (currentChain == null) {
+ throw new RowIdIsInvalidForModificationsException();
+ }
+ return currentChain;
+ }
+
+ @Override
+ public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
+ VersionChain currentVersionChain = findVersionChainForModification(rowId);
+
+ if (currentVersionChain.transactionId() == null) {
+ throw new NoUncommittedVersionException();
+ }
+
+ RowVersion currentVersion = findLatestRowVersion(currentVersionChain, ALWAYS_LOAD_VALUE);
+ assert currentVersion.isUncommitted();
+
+ removeRowVersion(currentVersion);
+
+ if (currentVersion.hasNextLink()) {
+ VersionChain versionChainReplacement = VersionChain.withoutTxId(
+ partitionId,
+ currentVersionChain.link(),
+ currentVersion.nextLink()
+ );
+ updateVersionChain(currentVersionChain, versionChainReplacement);
+ } else {
+ // it was the only version, let's remove the chain as well
+ removeVersionChain(currentVersionChain);
+ }
+
+ return rowVersionToBinaryRow(currentVersion);
+ }
+
+ private void removeVersionChain(VersionChain currentVersionChain) {
+ try {
+ versionChainFreeList.removeDataRowByLink(currentVersionChain.link());
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot remove chain version", e);
+ }
+ }
+
+ @Override
+ public void commitWrite(RowId rowId, Timestamp timestamp) throws StorageException {
+ VersionChain currentVersionChain = findVersionChainForModification(rowId);
+ long chainLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(currentVersionChain.headLink(), partitionId);
+
+ assert currentVersionChain.transactionId() != null;
+
+ try {
+ rowVersionFreeList.updateTimestamp(chainLink, timestamp);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot update timestamp", e);
+ }
+
+ try {
+ versionChainFreeList.updateTransactionId(currentVersionChain.link(), null);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot update transaction ID", e);
+ }
+ }
+
+ private void removeRowVersion(RowVersion currentVersion) {
+ try {
+ rowVersionFreeList.removeDataRowByLink(currentVersion.link());
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot update row version");
+ }
+ }
+
+ private void updateVersionChain(VersionChain currentVersionChain, VersionChain versionChainReplacement) {
+ try {
+ boolean updatedInPlace = versionChainFreeList.updateDataRow(currentVersionChain.link(), versionChainReplacement);
+ if (!updatedInPlace) {
+ throw new StorageException("Only in-place updates are supported");
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot update version chain");
+ }
+ }
+
+ @Override
+ public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
+ return internalScan(keyFilter, txId, null);
+ }
+
+ @Override
+ public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
+ return internalScan(keyFilter, null, timestamp);
+ }
+
+ private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable UUID transactionId, @Nullable Timestamp timestamp) {
+ assert transactionId != null ^ timestamp != null;
+
+ IgniteCursor<VersionChain> treeCursor;
+ try {
+ treeCursor = versionChainTree.find(null, null);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Find failed", e);
+ }
+
+ return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
+ }
+
+ @Override
+ public void close() {
+ versionChainFreeList.close();
+ versionChainTree.close();
+ rowVersionFreeList.close();
+ }
+
+ private class ScanCursor implements Cursor<BinaryRow> {
+ private final IgniteCursor<VersionChain> treeCursor;
+ private final Predicate<BinaryRow> keyFilter;
+ private final @Nullable UUID transactionId;
+ private final @Nullable Timestamp timestamp;
+
+ private BinaryRow nextRow = null;
+ private boolean iterationExhausted = false;
+
+ public ScanCursor(
+ IgniteCursor<VersionChain> treeCursor,
+ Predicate<BinaryRow> keyFilter,
+ @Nullable UUID transactionId,
+ @Nullable Timestamp timestamp
+ ) {
+ this.treeCursor = treeCursor;
+ this.keyFilter = keyFilter;
+ this.transactionId = transactionId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextRow != null) {
+ return true;
+ }
+ if (iterationExhausted) {
+ return false;
+ }
+
+ while (true) {
+ boolean positionedToNext = tryAdvanceTreeCursor();
+
+ if (!positionedToNext) {
+ iterationExhausted = true;
+ return false;
+ }
+
+ VersionChain chain = getCurrentChainFromTreeCursor();
+ ByteBufferRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
+
+ if (row != null) {
+ nextRow = row;
+ return true;
+ }
+ }
+ }
+
+ private boolean tryAdvanceTreeCursor() {
+ try {
+ return treeCursor.next();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error when trying to advance tree cursor", e);
+ }
+ }
+
+ private VersionChain getCurrentChainFromTreeCursor() {
+ try {
+ return treeCursor.get();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Failed to get element from tree cursor", e);
+ }
+ }
+
+ @Override
+ public BinaryRow next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("The cursor is exhausted");
+ }
+
+ assert nextRow != null;
+
+ BinaryRow row = nextRow;
+ nextRow = null;
+
+ return row;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvStorageIoModule.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvStorageIoModule.java
new file mode 100644
index 000000000..4778fbc9a
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvStorageIoModule.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.io.PageIoModule;
+import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainDataIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainInnerIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainMetaIo;
+
+/**
+ * {@link PageIoModule} related to {@link PageMemoryMvPartitionStorage} implementation.
+ */
+public class PageMemoryMvStorageIoModule implements PageIoModule {
+ /** {@inheritDoc} */
+ @Override
+ public Collection<IoVersions<?>> ioVersions() {
+ return List.of(
+ VersionChainDataIo.VERSIONS,
+ VersionChainMetaIo.VERSIONS,
+ VersionChainInnerIo.VERSIONS,
+ VersionChainLeafIo.VERSIONS,
+ RowVersionDataIo.VERSIONS
+ );
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java
new file mode 100644
index 000000000..9bc142155
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getShort;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+
+/**
+ * Handling of <em>partitionless links</em>, that is, page memory links from which partition ID is removed.
+ * They are used to spare storage space in cases when we know the partition ID from the context.
+ *
+ * @see PageIdUtils#link(long, int)
+ */
+public class PartitionlessLinks {
+ /**
+ * Number of bytes a partitionless link takes in storage.
+ */
+ public static final int PARTITIONLESS_LINK_SIZE_BYTES = 6;
+
+ /**
+ * Converts a full link to partitionless link by removing the partition ID from it.
+ *
+ * @param link full link
+ * @return partitionless link
+ * @see PageIdUtils#link(long, int)
+ */
+ public static long removePartitionIdFromLink(long link) {
+ return ((link >> PageIdUtils.PART_ID_SIZE) & 0x0000FFFF00000000L)
+ | (link & 0xFFFFFFFFL);
+ }
+
+ /**
+ * Converts a partitionless link to a full link by inserting partition ID at the corresponding position.
+ *
+ * @param partitionlessLink link without partition
+ * @param partitionId partition ID to insert
+ * @return full link
+ * @see PageIdUtils#link(long, int)
+ */
+ public static long addPartitionIdToPartititionlessLink(long partitionlessLink, int partitionId) {
+ return (partitionlessLink << PageIdUtils.PART_ID_SIZE) & 0xFFFF000000000000L
+ | ((((long) partitionId) << PageIdUtils.PAGE_IDX_SIZE) & 0xFFFF00000000L)
+ | (partitionlessLink & PageIdUtils.PAGE_IDX_MASK);
+ }
+
+ /**
+ * Returns high 16 bits of the 6 bytes representing a partitionless link.
+ *
+ * @param partitionlessLink link without partition info
+ * @return high 16 bits
+ * @see #assemble(short, int)
+ */
+ public static short high16Bits(long partitionlessLink) {
+ return (short) (partitionlessLink >> Integer.SIZE);
+ }
+
+ /**
+ * Returns low 32 bits of the 6 bytes representing a partitionless link.
+ *
+ * @param partitionlessLink link without partition info
+ * @return low 32 bits
+ * @see #assemble(short, int)
+ */
+ public static int low32Bits(long partitionlessLink) {
+ return (int) (partitionlessLink & PageIdUtils.PAGE_IDX_MASK);
+ }
+
+ /**
+ * Assembles a partitionless link from high 16 bits and 32 low bits of its representation.
+ *
+ * @param high16 high 16 bits
+ * @param low32 low 32 bits
+ * @return reconstructed partitionless link
+ * @see #high16Bits(long)
+ * @see #low32Bits(long)
+ */
+ public static long assemble(short high16, int low32) {
+ return ((((long) high16) & 0xFFFF) << 32)
+ | (((long) low32) & 0xFFFFFFFFL);
+ }
+
+ static long readFromMemory(long pageAddr, int offset) {
+ short nextLinkHigh16 = getShort(pageAddr, offset);
+ int nextLinkLow32 = getInt(pageAddr, offset + Short.BYTES);
+
+ return assemble(nextLinkHigh16, nextLinkLow32);
+ }
+
+ static long readFromBuffer(ByteBuffer buffer) {
+ short nextLinkHigh16 = buffer.getShort();
+ int nextLinkLow32 = buffer.getInt();
+
+ return assemble(nextLinkHigh16, nextLinkLow32);
+ }
+
+ /**
+ * Writes a partitionless link to memory: first high 2 bytes, then low 4 bytes.
+ *
+ * @param addr address in memory where to start
+ * @param partitionlessLink the link to write
+ * @return number of bytes written (equal to {@link #PARTITIONLESS_LINK_SIZE_BYTES})
+ */
+ public static long writeToMemory(long addr, long partitionlessLink) {
+ putShort(addr, 0, PartitionlessLinks.high16Bits(partitionlessLink));
+ addr += Short.BYTES;
+ putInt(addr, 0, PartitionlessLinks.low32Bits(partitionlessLink));
+
+ return PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+ }
+
+ private PartitionlessLinks() {
+ // prevent instantiation
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java
new file mode 100644
index 000000000..363867483
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.nio.ByteBuffer;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Traversal for reading the latest row version. If the version is uncommitted, returns its value; otherwise, does NOT return it.
+ */
+class ReadLatestRowVersion implements PageMemoryTraversal<Predicate<Timestamp>> {
+ private RowVersion result;
+
+ private boolean readingFirstSlot = true;
+
+ private long firstFragmentLink;
+ @Nullable
+ private Timestamp timestamp;
+ private long nextLink;
+
+ private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Predicate<Timestamp> loadValue) {
+ if (readingFirstSlot) {
+ readingFirstSlot = false;
+ return readFullOrInitiateReadFragmented(link, pageAddr, payload, loadValue);
+ } else {
+ return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+ }
+ }
+
+ private long readFullOrInitiateReadFragmented(long link, long pageAddr, DataPagePayload payload, Predicate<Timestamp> loadValue) {
+ firstFragmentLink = link;
+
+ timestamp = Timestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ nextLink = PartitionlessLinks.readFromMemory(pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+ if (!loadValue.test(timestamp)) {
+ result = new RowVersion(partitionIdFromLink(link), firstFragmentLink, timestamp, nextLink, null);
+ return STOP_TRAVERSAL;
+ }
+
+ return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+ }
+
+ private int partitionIdFromLink(long link) {
+ return PageIdUtils.partitionId(PageIdUtils.pageId(link));
+ }
+
+ @Override
+ public void finish() {
+ if (result != null) {
+ // we did not read the value itself, so we don't need to invoke readRowVersionValue.finish()
+ return;
+ }
+
+ readRowVersionValue.finish();
+
+ byte[] valueBytes = readRowVersionValue.result();
+ ByteBuffer value = ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+ result = new RowVersion(partitionIdFromLink(firstFragmentLink), firstFragmentLink, timestamp, nextLink, value);
+ }
+
+ RowVersion result() {
+ return result;
+ }
+
+ void reset() {
+ result = null;
+ readingFirstSlot = true;
+ readRowVersionValue.reset();
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
similarity index 63%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
index dbc6e3c2f..b1a3a3e4f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
/**
- * Data region based on {@link PageMemory}.
+ * Reads {@link RowVersion#value()} from page-memory.
*/
-public interface PageMemoryDataRegion {
- /**
- * Returns {@link true} if the date region is persistent.
- */
- boolean persistent();
+class ReadRowVersionValue extends ReadPageMemoryRowValue {
+ @Override
+ protected int valueSizeOffsetInFirstSlot() {
+ return RowVersion.VALUE_SIZE_OFFSET;
+ }
- /**
- * Returns page memory, {@code null} if not started.
- */
- @Nullable
- PageMemory pageMemory();
+ @Override
+ protected int valueOffsetInFirstSlot() {
+ return RowVersion.VALUE_OFFSET;
+ }
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowIdIsInvalidForModificationsException.java
similarity index 67%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowIdIsInvalidForModificationsException.java
index dbc6e3c2f..c847fd740 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowIdIsInvalidForModificationsException.java
@@ -15,22 +15,13 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.lang.IgniteInternalException;
/**
- * Data region based on {@link PageMemory}.
+ * Thrown when trying to do a modification at {@link RowId} that has already become invalid for writes.
*/
-public interface PageMemoryDataRegion {
- /**
- * Returns {@link true} if the date region is persistent.
- */
- boolean persistent();
-
- /**
- * Returns page memory, {@code null} if not started.
- */
- @Nullable
- PageMemory pageMemory();
+class RowIdIsInvalidForModificationsException extends IgniteInternalException {
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
new file mode 100644
index 000000000..a75342b2d
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents row version inside row version chain.
+ */
+public class RowVersion implements Storable {
+ /**
+ * A 'timestamp' representing absense of a timestamp.
+ */
+ public static final Timestamp NULL_TIMESTAMP = new Timestamp(Long.MIN_VALUE, Long.MIN_VALUE);
+ /**
+ * Represents an absent partitionless link.
+ */
+ public static final long NULL_LINK = 0;
+
+ private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES;
+ private static final int NEXT_LINK_STORE_SIZE_BYTES = PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+ private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
+
+ public static final int TIMESTAMP_OFFSET = 0;
+ public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES;
+ public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + NEXT_LINK_STORE_SIZE_BYTES;
+ public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + VALUE_SIZE_STORE_SIZE_BYTES;
+
+ private final int partitionId;
+ private long link;
+
+ @Nullable
+ private final Timestamp timestamp;
+ private final long nextLink;
+ private final int valueSize;
+ @IgniteToStringExclude
+ @Nullable
+ private final ByteBuffer value;
+
+ /**
+ * Constructor.
+ */
+ public RowVersion(int partitionId, long nextLink, ByteBuffer value) {
+ this(partitionId, 0, null, nextLink, value);
+ }
+
+ /**
+ * Constructor.
+ */
+ public RowVersion(int partitionId, long link, @Nullable Timestamp timestamp, long nextLink, @Nullable ByteBuffer value) {
+ this.partitionId = partitionId;
+ link(link);
+
+ assert !NULL_TIMESTAMP.equals(timestamp) : "Null timestamp provided";
+
+ this.timestamp = timestamp;
+ this.nextLink = nextLink;
+ this.valueSize = value == null ? -1 : value.limit();
+ this.value = value;
+ }
+
+ @Nullable
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ public Timestamp timestampForStorage() {
+ return timestampForStorage(timestamp);
+ }
+
+ static Timestamp timestampForStorage(Timestamp timestamp) {
+ return timestamp == null ? NULL_TIMESTAMP : timestamp;
+ }
+
+ /**
+ * Returns partitionless link of the next version or {@code 0} if this version is the last in the chain (i.e. it's the oldest version).
+ *
+ * @return partitionless link of the next version or {@code 0} if this version is the last in the chain
+ */
+ public long nextLink() {
+ return nextLink;
+ }
+
+ public int valueSize() {
+ return valueSize;
+ }
+
+ public ByteBuffer value() {
+ return Objects.requireNonNull(value);
+ }
+
+ public boolean hasNextLink() {
+ return nextLink != NULL_LINK;
+ }
+
+ boolean isTombstone() {
+ return isTombstone(valueSize());
+ }
+
+ static boolean isTombstone(int valueSize) {
+ return valueSize == 0;
+ }
+
+ static boolean isTombstone(byte[] valueBytes) {
+ return isTombstone(valueBytes.length);
+ }
+
+ boolean isUncommitted() {
+ return isUncommitted(timestamp);
+ }
+
+ static boolean isUncommitted(Timestamp timestamp) {
+ return timestamp == null;
+ }
+
+ boolean isCommitted() {
+ return timestamp != null;
+ }
+
+ @Override
+ public final void link(long link) {
+ this.link = link;
+ }
+
+ @Override
+ public final long link() {
+ return link;
+ }
+
+ @Override
+ public final int partition() {
+ return partitionId;
+ }
+
+ @Override
+ public int size() {
+ assert value != null;
+
+ return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES + value.limit();
+ }
+
+ @Override
+ public int headerSize() {
+ return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES;
+ }
+
+ @Override
+ public IoVersions<? extends AbstractDataPageIo> ioVersions() {
+ return RowVersionDataIo.VERSIONS;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(RowVersion.class, this);
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
new file mode 100644
index 000000000..622415878
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
+import org.apache.ignite.internal.pagememory.freelist.AbstractFreeList;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link AbstractFreeList} for {@link RowVersion} instances.
+ */
+public class RowVersionFreeList extends AbstractFreeList<RowVersion> {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(RowVersionFreeList.class);
+
+ private final PageEvictionTracker evictionTracker;
+ private final IoStatisticsHolder statHolder;
+
+ private final UpdateTimestampHandler updateTimestampHandler = new UpdateTimestampHandler();
+
+ /**
+ * Constructor.
+ *
+ * @param grpId Group ID.
+ * @param pageMem Page memory.
+ * @param reuseList Reuse list to track pages that can be reused after they get completely empty (if {@code null},
+ * the free list itself will be used as a ReuseList.
+ * @param lockLsnr Page lock listener.
+ * @param metaPageId Metadata page ID.
+ * @param initNew {@code True} if new metadata should be initialized.
+ * @param pageListCacheLimit Page list cache limit.
+ * @param evictionTracker Page eviction tracker.
+ * @param statHolder Statistics holder to track IO operations.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public RowVersionFreeList(
+ int grpId,
+ PageMemory pageMem,
+ @Nullable ReuseList reuseList,
+ PageLockListener lockLsnr,
+ long metaPageId,
+ boolean initNew,
+ @Nullable AtomicLong pageListCacheLimit,
+ PageEvictionTracker evictionTracker,
+ IoStatisticsHolder statHolder
+ ) throws IgniteInternalCheckedException {
+ super(
+ grpId,
+ "RowVersionFreeList_" + grpId,
+ pageMem,
+ reuseList,
+ lockLsnr,
+ FLAG_AUX,
+ LOG,
+ metaPageId,
+ initNew,
+ pageListCacheLimit,
+ evictionTracker
+ );
+
+ this.evictionTracker = evictionTracker;
+ this.statHolder = statHolder;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+ return pageMem.allocatePage(grpId, INDEX_PARTITION, defaultPageFlag);
+ }
+
+ /**
+ * Inserts a row.
+ *
+ * @param row Row.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void insertDataRow(RowVersion row) throws IgniteInternalCheckedException {
+ super.insertDataRow(row, statHolder);
+ }
+
+ /**
+ * Updates row version's timestamp.
+ *
+ * @param link link to the slot containing row version
+ * @param newTimestamp timestamp to set
+ * @throws IgniteInternalCheckedException if something fails
+ */
+ public void updateTimestamp(long link, Timestamp newTimestamp) throws IgniteInternalCheckedException {
+ updateDataRow(link, updateTimestampHandler, newTimestamp, statHolder);
+ }
+
+ /**
+ * Removes a row by link.
+ *
+ * @param link Row link.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void removeDataRowByLink(long link) throws IgniteInternalCheckedException {
+ super.removeDataRowByLink(link, statHolder);
+ }
+
+ private class UpdateTimestampHandler implements PageHandler<Timestamp, Object> {
+ @Override
+ public Object run(
+ int groupId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIo io,
+ Timestamp arg,
+ int itemId,
+ IoStatisticsHolder statHolder
+ ) throws IgniteInternalCheckedException {
+ RowVersionDataIo dataIo = (RowVersionDataIo) io;
+
+ dataIo.updateTimestamp(pageAddr, itemId, pageSize(), arg);
+
+ evictionTracker.touchPage(pageId);
+
+ return true;
+ }
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
new file mode 100644
index 000000000..19f3086e1
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Traversal that scans Version Chain until first version visible at the given timestamp is found; then the version
+ * is converted to {@link ByteBufferRow} and finally made available via {@link #result()}.
+ *
+ * <p>NB: this traversal first traverses starting data slots of the Version Chain one after another; when it finds the
+ * version it needs, it switches to traversing the slots comprising the version (because it might be fragmented).
+ */
+class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> {
+ /**
+ * Contains the result when the traversal ends.
+ */
+ @Nullable
+ private ByteBufferRow result;
+
+ /**
+ * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink);
+ * then it's {@code false} (when we found the version we need and we read its value).
+ */
+ private boolean lookingForVersion = true;
+
+ private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Timestamp timestamp) {
+ if (lookingForVersion) {
+ Timestamp rowVersionTs = Timestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+
+ if (rowTimestampMatches(rowVersionTs, timestamp)) {
+ return readFullyOrStartReadingFragmented(link, pageAddr, payload);
+ } else {
+ return advanceToNextVersion(pageAddr, payload, partitionIdFromLink(link));
+ }
+ } else {
+ // We are continuing reading a fragmented row.
+ return readNextFragment(link, pageAddr, payload);
+ }
+ }
+
+ private int partitionIdFromLink(long link) {
+ return PageIdUtils.partitionId(PageIdUtils.pageId(link));
+ }
+
+ private boolean rowTimestampMatches(Timestamp rowVersionTs, Timestamp timestamp) {
+ return rowVersionTs != null && rowVersionTs.beforeOrEquals(timestamp);
+ }
+
+ private long readFullyOrStartReadingFragmented(long link, long pageAddr, DataPagePayload payload) {
+ lookingForVersion = false;
+
+ return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+ }
+
+ private long advanceToNextVersion(long pageAddr, DataPagePayload payload, int partitionId) {
+ long partitionlessNextLink = PartitionlessLinks.readFromMemory(pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+ if (partitionlessNextLink == RowVersion.NULL_LINK) {
+ return STOP_TRAVERSAL;
+ }
+ return PartitionlessLinks.addPartitionIdToPartititionlessLink(partitionlessNextLink, partitionId);
+ }
+
+ private long readNextFragment(long link, long pageAddr, DataPagePayload payload) {
+ return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+ }
+
+ @Override
+ public void finish() {
+ if (lookingForVersion) {
+ // we never found the version -> we hever tried to read its value AND the result is null
+ result = null;
+ return;
+ }
+
+ readRowVersionValue.finish();
+
+ byte[] valueBytes = readRowVersionValue.result();
+
+ if (RowVersion.isTombstone(valueBytes)) {
+ result = null;
+ } else {
+ result = new ByteBufferRow(valueBytes);
+ }
+ }
+
+ @Nullable
+ ByteBufferRow result() {
+ return result;
+ }
+
+ void reset() {
+ result = null;
+ lookingForVersion = true;
+ readRowVersionValue.reset();
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
new file mode 100644
index 000000000..f78bb42eb
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Code to work with {@link Timestamp}s.
+ */
+public class Timestamps {
+ /**
+ * Reads a {@link Timestamp} value from memory.
+ *
+ * @param pageAddr address where page data starts
+ * @param offset offset to the timestamp value relative to pageAddr
+ * @return the timestamp
+ */
+ @Nullable
+ static Timestamp readTimestamp(long pageAddr, int offset) {
+ long nodeId = getLong(pageAddr, offset);
+ long localTimestamp = getLong(pageAddr, offset + Long.BYTES);
+
+ Timestamp timestamp = new Timestamp(localTimestamp, nodeId);
+ if (timestamp.equals(RowVersion.NULL_TIMESTAMP)) {
+ timestamp = null;
+ }
+
+ return timestamp;
+ }
+
+ /**
+ * Writes a {@link Timestamp} to memory starting at the given address + offset.
+ *
+ * @param addr memory address
+ * @param offset offset added to the address
+ * @param timestamp the timestamp to write
+ * @return number of bytes written
+ */
+ public static int writeTimestamp(long addr, int offset, @Nullable Timestamp timestamp) {
+ Timestamp timestampForStorage = RowVersion.timestampForStorage(timestamp);
+
+ putLong(addr, offset, timestampForStorage.getNodeId());
+ putLong(addr, offset + Long.BYTES, timestampForStorage.getTimestamp());
+
+ return 2 * Long.BYTES;
+ }
+
+ private Timestamps() {
+ // prevent instantiation
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java
new file mode 100644
index 000000000..709d7b18e
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utils to work with Transaction IDs.
+ */
+public class TransactionIds {
+ /**
+ * Writes transaction ID to memory starting at the specified address.
+ *
+ * @param addr addreds where to start writing
+ * @param offset offset to add to the address
+ * @param txId transaction ID to write
+ * @return number of bytes written
+ */
+ public static int writeTransactionId(long addr, int offset, @Nullable UUID txId) {
+ long txIdHigh;
+ long txIdLow;
+ if (txId != null) {
+ txIdHigh = txId.getMostSignificantBits();
+ txIdLow = txId.getLeastSignificantBits();
+ } else {
+ txIdHigh = VersionChain.NULL_UUID_COMPONENT;
+ txIdLow = VersionChain.NULL_UUID_COMPONENT;
+ }
+
+ putLong(addr, offset, txIdHigh);
+ putLong(addr, offset + Long.BYTES, txIdLow);
+
+ return 2 * Long.BYTES;
+ }
+
+ private TransactionIds() {
+ // prevent instantiation
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
new file mode 100644
index 000000000..15c702a9d
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainDataIo;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents row version chain: that is, all versions of the row plus some row-level metadata.
+ *
+ * <p>NB: this represents the whole set of versions, not just one version in the chain.
+ */
+public class VersionChain extends VersionChainLink implements Storable {
+ public static long NULL_UUID_COMPONENT = 0;
+
+ private static final int TRANSACTION_ID_STORE_SIZE_BYTES = 2 * Long.BYTES;
+ private static final int HEAD_LINK_STORE_SIZE_BYTES = PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+
+ public static final int TRANSACTION_ID_OFFSET = 0;
+
+ private final int partitionId;
+ @Nullable
+ private final UUID transactionId;
+ private final long headLink;
+
+ // TODO: IGNITE-17008 - add nextLink
+
+ /**
+ * Constructs a VersionChain without a transaction ID.
+ */
+ public static VersionChain withoutTxId(int partitionId, long link, long headLink) {
+ return new VersionChain(partitionId, link, null, headLink);
+ }
+
+ /**
+ * Constructor.
+ */
+ public VersionChain(int partitionId, @Nullable UUID transactionId, long headLink) {
+ this.partitionId = partitionId;
+ this.transactionId = transactionId;
+ this.headLink = headLink;
+ }
+
+ /**
+ * Constructor.
+ */
+ public VersionChain(int partitionId, long link, @Nullable UUID transactionId, long headLink) {
+ super(link);
+ this.partitionId = partitionId;
+ this.transactionId = transactionId;
+ this.headLink = headLink;
+ }
+
+ @Nullable
+ public UUID transactionId() {
+ return transactionId;
+ }
+
+ public long headLink() {
+ return headLink;
+ }
+
+ @Override
+ public final int partition() {
+ return partitionId;
+ }
+
+ @Override
+ public int size() {
+ return TRANSACTION_ID_STORE_SIZE_BYTES + HEAD_LINK_STORE_SIZE_BYTES;
+ }
+
+ @Override
+ public int headerSize() {
+ return size();
+ }
+
+ @Override
+ public IoVersions<? extends AbstractDataPageIo> ioVersions() {
+ return VersionChainDataIo.VERSIONS;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(VersionChain.class, this);
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainDataPageReader.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainDataPageReader.java
new file mode 100644
index 000000000..2b8a8221f
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainDataPageReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
+import org.apache.ignite.internal.pagememory.datapage.NonFragmentableDataPageReader;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link DataPageReader} for {@link VersionChain} instances.
+ */
+class VersionChainDataPageReader extends NonFragmentableDataPageReader<VersionChain> {
+ /**
+ * Constructs a new instance.
+ *
+ * @param pageMemory page memory that will be used to lock and access memory
+ * @param groupId ID of the cache group with which the reader works (all pages must belong to this group)
+ * @param statisticsHolder used to track statistics about operations
+ */
+ public VersionChainDataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder statisticsHolder) {
+ super(pageMemory, groupId, statisticsHolder);
+ }
+
+ @Nullable
+ private UUID assembleTxId(long high, long low) {
+ if (high == VersionChain.NULL_UUID_COMPONENT && low == VersionChain.NULL_UUID_COMPONENT) {
+ return null;
+ } else {
+ return new UUID(high, low);
+ }
+ }
+
+ @Override
+ protected VersionChain readRowFromAddress(long link, long pageAddr) {
+ int offset = 0;
+
+ long high = getLong(pageAddr, offset);
+ offset += Long.BYTES;
+ long low = getLong(pageAddr, offset);
+ offset += Long.BYTES;
+ UUID txId = assembleTxId(high, low);
+
+ long headLink = PartitionlessLinks.readFromMemory(pageAddr, offset);
+ offset += PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+
+ return new VersionChain(partitionOfLink(link), link, txId, headLink);
+ }
+
+ private int partitionOfLink(long link) {
+ return partitionId(pageId(link));
+ }
+
+ @Override
+ protected boolean handleNonExistentItemsGracefully() {
+ return true;
+ }
+
+ @Override
+ protected @Nullable VersionChain rowForNonExistingItem(long pageId, long itemId) {
+ return null;
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainFreeList.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainFreeList.java
new file mode 100644
index 000000000..e954918d3
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainFreeList.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
+import org.apache.ignite.internal.pagememory.freelist.AbstractFreeList;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainDataIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link AbstractFreeList} for {@link VersionChain} instances.
+ */
+public class VersionChainFreeList extends AbstractFreeList<VersionChain> {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(VersionChainFreeList.class);
+
+ private final PageEvictionTracker evictionTracker;
+ private final IoStatisticsHolder statHolder;
+
+ private final UpdateTransactionIdHandler updateTransactionHandler = new UpdateTransactionIdHandler();
+
+ /**
+ * Constructor.
+ *
+ * @param grpId Group ID.
+ * @param pageMem Page memory.
+ * @param reuseList Reuse list to use.
+ * @param lockLsnr Page lock listener.
+ * @param metaPageId Metadata page ID.
+ * @param initNew {@code True} if new metadata should be initialized.
+ * @param pageListCacheLimit Page list cache limit.
+ * @param evictionTracker Page eviction tracker.
+ * @param statHolder Statistics holder to track IO operations.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public VersionChainFreeList(
+ int grpId,
+ PageMemory pageMem,
+ ReuseList reuseList,
+ PageLockListener lockLsnr,
+ long metaPageId,
+ boolean initNew,
+ @Nullable AtomicLong pageListCacheLimit,
+ PageEvictionTracker evictionTracker,
+ IoStatisticsHolder statHolder
+ ) throws IgniteInternalCheckedException {
+ super(
+ grpId,
+ "VersionChainFreeList_" + grpId,
+ pageMem,
+ reuseList,
+ lockLsnr,
+ FLAG_AUX,
+ LOG,
+ metaPageId,
+ initNew,
+ pageListCacheLimit,
+ evictionTracker
+ );
+
+ this.evictionTracker = evictionTracker;
+ this.statHolder = statHolder;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+ return pageMem.allocatePage(grpId, INDEX_PARTITION, defaultPageFlag);
+ }
+
+ /**
+ * Inserts a row.
+ *
+ * @param row Row.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void insertDataRow(VersionChain row) throws IgniteInternalCheckedException {
+ super.insertDataRow(row, statHolder);
+ }
+
+ /**
+ * Updates a row by link.
+ *
+ * @param link Row link.
+ * @param row New row data.
+ * @return {@code True} if was able to update row.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public boolean updateDataRow(long link, VersionChain row) throws IgniteInternalCheckedException {
+ return super.updateDataRow(link, row, statHolder);
+ }
+
+ /**
+ * Updates version chain's transactionId.
+ *
+ * @param link link to the slot containing row version
+ * @param transactionId transaction ID to set
+ * @throws IgniteInternalCheckedException if something fails
+ */
+ public void updateTransactionId(long link, @Nullable UUID transactionId) throws IgniteInternalCheckedException {
+ updateDataRow(link, updateTransactionHandler, transactionId, statHolder);
+ }
+
+ /**
+ * Removes a row by link.
+ *
+ * @param link Row link.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void removeDataRowByLink(long link) throws IgniteInternalCheckedException {
+ super.removeDataRowByLink(link, statHolder);
+ }
+
+ private class UpdateTransactionIdHandler implements PageHandler<UUID, Object> {
+ @Override
+ public Object run(
+ int groupId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIo io,
+ UUID arg,
+ int itemId,
+ IoStatisticsHolder statHolder
+ ) throws IgniteInternalCheckedException {
+ VersionChainDataIo dataIo = (VersionChainDataIo) io;
+
+ dataIo.updateTransactionId(pageAddr, itemId, pageSize(), arg);
+
+ evictionTracker.touchPage(pageId);
+
+ return true;
+ }
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainLink.java
similarity index 65%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainLink.java
index dbc6e3c2f..4d0d9514f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainLink.java
@@ -15,22 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory;
-
-import org.jetbrains.annotations.Nullable;
+package org.apache.ignite.internal.storage.pagememory.mv;
/**
- * Data region based on {@link PageMemory}.
+ * A link to version chain.
*/
-public interface PageMemoryDataRegion {
- /**
- * Returns {@link true} if the date region is persistent.
- */
- boolean persistent();
+public class VersionChainLink {
+ public static final int SIZE_IN_BYTES = Long.BYTES;
+
+ private long link;
+
+ public VersionChainLink() {
+ }
+
+ public VersionChainLink(long link) {
+ this.link = link;
+ }
+
+ public long link() {
+ return link;
+ }
- /**
- * Returns page memory, {@code null} if not started.
- */
- @Nullable
- PageMemory pageMemory();
+ public void link(long link) {
+ this.link = link;
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainTree.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainTree.java
new file mode 100644
index 000000000..d509ee406
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainTree.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainInnerIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainMetaIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link BplusTree} implementation for storing version chains.
+ */
+public class VersionChainTree extends BplusTree<VersionChainLink, VersionChain> {
+ private final int partitionId;
+
+ private final VersionChainDataPageReader dataPageReader;
+
+ /**
+ * Constructor.
+ *
+ * @param grpId Group ID.
+ * @param grpName Group name.
+ * @param pageMem Page memory.
+ * @param lockLsnr Page lock listener.
+ * @param globalRmvId Global remove ID.
+ * @param metaPageId Meta page ID.
+ * @param reuseList Reuse list.
+ * @param partitionId Partition id.
+ * @param initNew {@code True} if new tree should be created.
+ */
+ public VersionChainTree(
+ int grpId,
+ String grpName,
+ PageMemory pageMem,
+ PageLockListener lockLsnr,
+ AtomicLong globalRmvId,
+ long metaPageId,
+ @Nullable ReuseList reuseList,
+ int partitionId,
+ boolean initNew
+ ) throws IgniteInternalCheckedException {
+ super(
+ "VersionChainTree_" + grpId,
+ grpId,
+ grpName,
+ pageMem,
+ lockLsnr,
+ FLAG_AUX,
+ globalRmvId,
+ metaPageId,
+ reuseList
+ );
+
+ this.partitionId = partitionId;
+
+ dataPageReader = new VersionChainDataPageReader(pageMem, grpId, IoStatisticsHolderNoOp.INSTANCE);
+
+ setIos(VersionChainInnerIo.VERSIONS, VersionChainLeafIo.VERSIONS, VersionChainMetaIo.VERSIONS);
+
+ initTree(initNew);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+ return pageMem.allocatePage(grpId, partitionId, defaultPageFlag);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected int compare(BplusIo<VersionChainLink> io, long pageAddr, int idx, VersionChainLink row) {
+ VersionChainIo versionChainIo = (VersionChainIo) io;
+
+ long thisLink = versionChainIo.link(pageAddr, idx);
+ long thatLink = row.link();
+ return Long.compare(thisLink, thatLink);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public VersionChain getRow(BplusIo<VersionChainLink> io, long pageAddr, int idx, Object x) throws IgniteInternalCheckedException {
+ VersionChainIo rowIo = (VersionChainIo) io;
+
+ long link = rowIo.link(pageAddr, idx);
+
+ return dataPageReader.getRowByLink(link);
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
new file mode 100644
index 000000000..747a528f6
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putByteBuffer;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks;
+import org.apache.ignite.internal.storage.pagememory.mv.RowVersion;
+import org.apache.ignite.internal.storage.pagememory.mv.Timestamps;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.lang.IgniteStringBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data pages IO for {@link RowVersion}.
+ */
+public class RowVersionDataIo extends AbstractDataPageIo<RowVersion> {
+ /** Page IO type. */
+ public static final short T_VALUE_VERSION_DATA_IO = 11;
+
+ /** I/O versions. */
+ public static final IoVersions<RowVersionDataIo> VERSIONS = new IoVersions<>(new RowVersionDataIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ protected RowVersionDataIo(int ver) {
+ super(T_VALUE_VERSION_DATA_IO, ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void writeRowData(long pageAddr, int dataOff, int payloadSize, RowVersion row, boolean newRow) {
+ assertPageType(pageAddr);
+
+ long addr = pageAddr + dataOff;
+
+ putShort(addr, 0, (short) payloadSize);
+ addr += 2;
+
+ addr += Timestamps.writeTimestamp(addr, 0, row.timestamp());
+
+ addr += PartitionlessLinks.writeToMemory(addr, row.nextLink());
+
+ ByteBuffer value = row.value();
+ putInt(addr, 0, value.limit());
+ addr += 4;
+
+ putByteBuffer(addr, 0, value);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void writeFragmentData(RowVersion row, ByteBuffer buf, int rowOff, int payloadSize) {
+ assertPageType(buf);
+
+ // TODO: IGNITE-17009 - implement
+
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /**
+ * Updates timestamp leaving the rest untouched.
+ *
+ * @param pageAddr page address
+ * @param itemId item ID of the slot where row version (or its first fragment) is stored in this page
+ * @param pageSize size of the page
+ * @param timestamp timestamp to store
+ */
+ public void updateTimestamp(long pageAddr, int itemId, int pageSize, @Nullable Timestamp timestamp) {
+ int dataOff = getDataOffset(pageAddr, itemId, pageSize);
+ int payloadOffset = dataOff + Short.BYTES;
+
+ Timestamps.writeTimestamp(pageAddr, payloadOffset + RowVersion.TIMESTAMP_OFFSET, timestamp);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
+ sb.app("RowVersionDataIo [\n");
+ printPageLayout(addr, pageSize, sb);
+ sb.app("\n]");
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainDataIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainDataIo.java
new file mode 100644
index 000000000..eb18573a5
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainDataIo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks;
+import org.apache.ignite.internal.storage.pagememory.mv.TransactionIds;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChain;
+import org.apache.ignite.lang.IgniteStringBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link AbstractDataPageIo} for {@link VersionChain} instances.
+ */
+public class VersionChainDataIo extends AbstractDataPageIo<VersionChain> {
+ /** Page IO type. */
+ public static final short T_VERSION_CHAIN_IO = 7;
+
+ /** I/O versions. */
+ public static final IoVersions<VersionChainDataIo> VERSIONS = new IoVersions<>(new VersionChainDataIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ protected VersionChainDataIo(int ver) {
+ super(T_VERSION_CHAIN_IO, ver);
+ }
+
+ @Override
+ protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
+ sb.app("VersionChainDataIo [\n");
+ printPageLayout(addr, pageSize, sb);
+ sb.app("\n]");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void writeRowData(long pageAddr, int dataOff, int payloadSize, VersionChain row, boolean newRow) {
+ assertPageType(pageAddr);
+
+ long addr = pageAddr + dataOff;
+
+ putShort(addr, 0, (short) payloadSize);
+ addr += Short.BYTES;
+
+ addr += TransactionIds.writeTransactionId(addr, 0, row.transactionId());
+
+ addr += PartitionlessLinks.writeToMemory(addr, row.headLink());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void writeFragmentData(VersionChain row, ByteBuffer buf, int rowOff, int payloadSize) {
+ assertPageType(buf);
+
+ throw new UnsupportedOperationException("Splitting version chain rows to fragments is ridiculous!");
+ }
+
+ /**
+ * Writes transaction ID leaving everything else untouched.
+ *
+ * @param pageAddr page address
+ * @param itemId number of the item representing the slot where the row of interest resides
+ * @param pageSize size of the page
+ * @param txId transaction ID to write
+ */
+ public void updateTransactionId(long pageAddr, int itemId, int pageSize, @Nullable UUID txId) {
+ int dataOff = getDataOffset(pageAddr, itemId, pageSize);
+ int payloadOffset = dataOff + Short.BYTES;
+
+ TransactionIds.writeTransactionId(pageAddr, payloadOffset + VersionChain.TRANSACTION_ID_OFFSET, txId);
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
new file mode 100644
index 000000000..d24e83ea4
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainLink;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
+
+/**
+ * IO routines for {@link VersionChainTree} inner pages.
+ *
+ * <p>Structure: link(long).
+ */
+public class VersionChainInnerIo extends BplusInnerIo<VersionChainLink> implements VersionChainIo {
+ /** Page IO type. */
+ public static final short T_VERSION_CHAIN_INNER_IO = 9;
+
+ /** I/O versions. */
+ public static final IoVersions<VersionChainInnerIo> VERSIONS = new IoVersions<>(new VersionChainInnerIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ protected VersionChainInnerIo(int ver) {
+ super(T_VERSION_CHAIN_INNER_IO, ver, true, VersionChainLink.SIZE_IN_BYTES);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void store(long dstPageAddr, int dstIdx, BplusIo<VersionChainLink> srcIo, long srcPageAddr, int srcIdx) {
+ assertPageType(dstPageAddr);
+
+ long srcLink = link(srcPageAddr, srcIdx);
+
+ int dstOff = offset(dstIdx);
+
+ putLong(dstPageAddr, dstOff, srcLink);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void storeByOffset(long pageAddr, int off, VersionChainLink row) {
+ assertPageType(pageAddr);
+
+ putLong(pageAddr, off, row.link());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public VersionChainLink getLookupRow(BplusTree<VersionChainLink, ?> tree, long pageAddr, int idx) {
+ long link = link(pageAddr, idx);
+
+ return new VersionChainLink(link);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long link(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return getLong(pageAddr, offset(idx));
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
similarity index 68%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
index dbc6e3c2f..63b42992e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
@@ -15,22 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory;
-
-import org.jetbrains.annotations.Nullable;
+package org.apache.ignite.internal.storage.pagememory.mv.io;
/**
- * Data region based on {@link PageMemory}.
+ * Interface for VersionChain B+Tree-related IO.
*/
-public interface PageMemoryDataRegion {
- /**
- * Returns {@link true} if the date region is persistent.
- */
- boolean persistent();
-
+public interface VersionChainIo {
/**
- * Returns page memory, {@code null} if not started.
+ * Returns the link for the row in the page by index.
+ *
+ * @param pageAddr Page address.
+ * @param idx Index.
*/
- @Nullable
- PageMemory pageMemory();
+ long link(long pageAddr, int idx);
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
new file mode 100644
index 000000000..15bbce063
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainLink;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
+
+/**
+ * IO routines for {@link VersionChainTree} leaf pages.
+ *
+ * <p>Structure: link(long).
+ */
+public class VersionChainLeafIo extends BplusLeafIo<VersionChainLink> implements VersionChainIo {
+ /** Page IO type. */
+ public static final short T_VERSION_CHAIN_LEAF_IO = 10;
+
+ /** I/O versions. */
+ public static final IoVersions<VersionChainLeafIo> VERSIONS = new IoVersions<>(new VersionChainLeafIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ protected VersionChainLeafIo(int ver) {
+ super(T_VERSION_CHAIN_LEAF_IO, ver, VersionChainLink.SIZE_IN_BYTES);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void store(long dstPageAddr, int dstIdx, BplusIo<VersionChainLink> srcIo, long srcPageAddr, int srcIdx) {
+ assertPageType(dstPageAddr);
+
+ long srcLink = link(srcPageAddr, srcIdx);
+
+ int dstOff = offset(dstIdx);
+
+ putLong(dstPageAddr, dstOff, srcLink);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void storeByOffset(long pageAddr, int off, VersionChainLink row) {
+ assertPageType(pageAddr);
+
+ putLong(pageAddr, off, row.link());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public VersionChainLink getLookupRow(BplusTree<VersionChainLink, ?> tree, long pageAddr, int idx) {
+ long link = link(pageAddr, idx);
+
+ return new VersionChainLink(link);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long link(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return getLong(pageAddr, offset(idx));
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
similarity index 52%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
index dbc6e3c2f..ae66188a4 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
@@ -15,22 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv.io;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.io.BplusMetaIo;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
/**
- * Data region based on {@link PageMemory}.
+ * IO routines for {@link VersionChainTree} meta pages.
*/
-public interface PageMemoryDataRegion {
- /**
- * Returns {@link true} if the date region is persistent.
- */
- boolean persistent();
+public class VersionChainMetaIo extends BplusMetaIo {
+ /** Page IO type. */
+ public static final short T_VERSION_CHAIN_META_IO = 8;
+
+ /** I/O versions. */
+ public static final IoVersions<VersionChainMetaIo> VERSIONS = new IoVersions<>(new VersionChainMetaIo(1));
/**
- * Returns page memory, {@code null} if not started.
+ * Constructor.
+ *
+ * @param ver Page format version.
*/
- @Nullable
- PageMemory pageMemory();
+ protected VersionChainMetaIo(int ver) {
+ super(T_VERSION_CHAIN_META_IO, ver);
+ }
}
diff --git a/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
index 262224961..639bb65fe 100644
--- a/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
+++ b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
@@ -15,3 +15,4 @@
# limitations under the License.
#
org.apache.ignite.internal.storage.pagememory.PageMemoryStorageIoModule
+org.apache.ignite.internal.storage.pagememory.mv.PageMemoryMvStorageIoModule
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorageTest.java
new file mode 100644
index 000000000..49e53a80d
--- /dev/null
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorageTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.pagememory.PageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.PageMemoryTableStorage;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageChange;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageConfigurationSchema;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageView;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfigurationSchema;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+class PageMemoryMvPartitionStorageTest extends AbstractMvPartitionStorageTest<PageMemoryMvPartitionStorage> {
+ private final PageIoRegistry ioRegistry = new PageIoRegistry();
+
+ {
+ ioRegistry.loadFromServiceLoader();
+ }
+
+ @InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
+ private PageMemoryStorageEngineConfiguration engineConfig;
+
+ @InjectConfiguration(
+ name = "table",
+ polymorphicExtensions = {
+ HashIndexConfigurationSchema.class,
+ UnknownDataStorageConfigurationSchema.class,
+ PageMemoryDataStorageConfigurationSchema.class
+ }
+ )
+ private TableConfiguration tableCfg;
+
+ private PageMemoryStorageEngine engine;
+
+ private PageMemoryTableStorage table;
+
+ private int nextPageIndex = 100;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ engine = new PageMemoryStorageEngine(engineConfig, ioRegistry);
+
+ engine.start();
+
+ tableCfg.change(c -> c.changeDataStorage(dsc -> dsc.convert(PageMemoryDataStorageChange.class)))
+ .get(1, TimeUnit.SECONDS);
+
+ assertEquals(
+ PageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME,
+ ((PageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
+ );
+
+ table = engine.createTable(tableCfg);
+ table.start();
+
+ storage = table.createMvPartitionStorage(partitionId());
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ storage,
+ table == null ? null : table::stop,
+ engine == null ? null : engine::stop
+ );
+ }
+
+ @Override
+ protected int partitionId() {
+ // 1 instead of the default 0 to make sure that we note cases when we forget to pass the partition ID (in which
+ // case it will turn into 0).
+ return 1;
+ }
+
+ @SuppressWarnings("JUnit3StyleTestMethodInJUnit4Class")
+ @Override
+ public void testReadsFromEmpty() {
+ // TODO: enable the test when https://issues.apache.org/jira/browse/IGNITE-17006 is implemented
+
+ // Effectively, disable the test because it makes no sense for this kind of storage as attempts to read using random
+ // pageIds will result in troubles.
+ }
+
+ @Test
+ void abortOfInsertMakesRowIdInvalidForAddWrite() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.abortWrite(rowId);
+
+ assertThrows(RowIdIsInvalidForModificationsException.class, () -> storage.addWrite(rowId, binaryRow2, txId));
+ }
+
+ @Test
+ void abortOfInsertMakesRowIdInvalidForCommitWrite() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.abortWrite(rowId);
+
+ assertThrows(RowIdIsInvalidForModificationsException.class, () -> storage.commitWrite(rowId, Timestamp.nextVersion()));
+ }
+
+ @Test
+ void abortOfInsertMakesRowIdInvalidForAbortWrite() {
+ RowId rowId = storage.insert(binaryRow, newTransactionId());
+ storage.abortWrite(rowId);
+
+ assertThrows(RowIdIsInvalidForModificationsException.class, () -> storage.abortWrite(rowId));
+ }
+}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinksTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinksTest.java
new file mode 100644
index 000000000..3ee377c93
--- /dev/null
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinksTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.junit.jupiter.api.Test;
+
+class PartitionlessLinksTest {
+ @Test
+ void removesPartitionIdFromFullLink() {
+ // 0xABCD is partitionId
+ long fullLink = 0x7733ABCDDEADBEEFL;
+
+ long partitionlessLink = PartitionlessLinks.removePartitionIdFromLink(fullLink);
+
+ assertThat(partitionlessLink, is(0x7733DEADBEEFL));
+ }
+
+ @Test
+ void addsPartitionId() {
+ long partitionlessLink = 0x7733DEADBEEFL;
+
+ long fullLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(partitionlessLink, 0xABCD);
+
+ assertThat(fullLink, is(0x7733ABCDDEADBEEFL));
+ }
+
+ @Test
+ void removalAndAdditionOfPartitionIdRestoresLink() {
+ long fullLink = 0x7733ABCDDEADBEEFL;
+
+ long partitionlessLink = PartitionlessLinks.removePartitionIdFromLink(fullLink);
+ long reconstructedLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(partitionlessLink, 0xABCD);
+
+ assertThat(reconstructedLink, is(fullLink));
+ }
+
+ @Test
+ void extractsHigh16Bits() {
+ short high16Bits = PartitionlessLinks.high16Bits(0x7733DEADBEEFL);
+
+ assertThat(high16Bits, is((short) 0x7733));
+ }
+
+ @Test
+ void extractsLow32Bits() {
+ int low32Bits = PartitionlessLinks.low32Bits(0x7733DEADBEEFL);
+
+ assertThat(low32Bits, is(0xDEADBEEF));
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 21b57fe49..0b4949d3c 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -30,6 +30,7 @@ import java.util.function.Predicate;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.NoUncommittedVersionException;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -212,6 +213,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
try {
byte[] previousValue = db.get(cf, keyBuf.array(), 0, ROW_PREFIX_SIZE);
+ if (previousValue == null) {
+ throw new NoUncommittedVersionException();
+ }
// Perform unconditional remove for the key without associated timestamp.
db.delete(cf, writeOpts, keyBuf.array(), 0, ROW_PREFIX_SIZE);
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index d7323c039..1b1573e3a 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageChange;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -46,13 +45,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
-public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTest {
+public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTest<RocksDbMvPartitionStorage> {
private RocksDbStorageEngine engine;
private RocksDbTableStorage table;
- private RocksDbMvPartitionStorage storage;
-
@BeforeEach
public void setUp(
@WorkDirectory Path workDir,
@@ -92,8 +89,4 @@ public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTes
);
}
- @Override
- protected MvPartitionStorage partitionStorage() {
- return storage;
- }
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java
index 32adc54a0..e3c8640ae 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java
@@ -76,7 +76,17 @@ public class Timestamp implements Comparable<Timestamp>, Serializable {
@Override
public int compareTo(@NotNull Timestamp other) {
return (this.timestamp < other.timestamp ? -1 : (this.timestamp > other.timestamp ? 1 :
- (this.nodeId < other.nodeId ? -1 : (this.nodeId > other.nodeId ? 1 : 0))));
+ Long.compare(this.nodeId, other.nodeId)));
+ }
+
+ /**
+ * Returns {@code true} if this is before or equal to another timestamp.
+ *
+ * @param that another timestamp
+ * @return {@code true} if this is before or equal to another timestamp
+ */
+ public boolean beforeOrEquals(Timestamp that) {
+ return this.compareTo(that) <= 0;
}
/**