You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/09/19 15:06:52 UTC
[ignite-3] 01/01: IGNITE-17627 Extend MvPartitionStorage read API with write intent resolution capabilities
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-17627
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 45e228da7c2db45e2a3aa3b32fac90eaa1faa796
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Sep 19 19:00:42 2022 +0400
IGNITE-17627 Extend MvPartitionStorage read API with write intent resolution capabilities
---
.../apache/ignite/internal/util/GridUnsafe.java | 52 ++--
.../internal/util/io/IgniteUnsafeDataInput.java | 22 +-
.../internal/util/io/IgniteUnsafeDataOutput.java | 22 +-
.../stream/DirectByteBufferStreamImplV1.java | 44 +--
.../internal/storage/MvPartitionStorage.java | 30 +-
.../apache/ignite/internal/storage/ReadResult.java | 125 ++++++++
.../storage/AbstractMvPartitionStorageTest.java | 296 ++++++++++++++++--
.../storage/AbstractMvTableStorageTest.java | 4 +-
.../TestConcurrentHashMapMvPartitionStorage.java | 120 ++++++--
.../mv/AbstractPageMemoryMvPartitionStorage.java | 113 +++++--
.../internal/storage/pagememory/mv/RowVersion.java | 2 +-
.../pagememory/mv/ScanVersionChainByTimestamp.java | 80 ++++-
.../storage/pagememory/mv/VersionChain.java | 31 +-
.../storage/pagememory/mv/io/VersionChainIo.java | 40 ++-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 330 +++++++++++++++++----
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 4 +-
.../distributed/storage/VersionedRowStore.java | 11 +-
17 files changed, 1084 insertions(+), 242 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 2d37a5ce4a..12af2626a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -73,8 +73,8 @@ public abstract class GridUnsafe {
/** Per-byte copy threshold. */
private static final long PER_BYTE_THRESHOLD = 0L;
- /** Big endian. */
- public static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+ /** Flag indicating whether system's native order is big endian. */
+ public static final boolean IS_BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
/** Address size. */
public static final int ADDR_SIZE = UNSAFE.addressSize();
@@ -571,7 +571,7 @@ public abstract class GridUnsafe {
* @return Short value from byte array.
*/
public static short getShort(byte[] arr, long off) {
- return UNALIGNED ? UNSAFE.getShort(arr, off) : getShortByByte(arr, off, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getShort(arr, off) : getShortByByte(arr, off, IS_BIG_ENDIAN);
}
/**
@@ -581,7 +581,7 @@ public abstract class GridUnsafe {
* @return Short value from given address.
*/
public static short getShort(long addr) {
- return UNALIGNED ? UNSAFE.getShort(addr) : getShortByByte(addr, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getShort(addr) : getShortByByte(addr, IS_BIG_ENDIAN);
}
/**
@@ -595,7 +595,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putShort(arr, off, val);
} else {
- putShortByByte(arr, off, val, BIG_ENDIAN);
+ putShortByByte(arr, off, val, IS_BIG_ENDIAN);
}
}
@@ -609,7 +609,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putShort(addr, val);
} else {
- putShortByByte(addr, val, BIG_ENDIAN);
+ putShortByByte(addr, val, IS_BIG_ENDIAN);
}
}
@@ -621,7 +621,7 @@ public abstract class GridUnsafe {
* @return Char value from byte array.
*/
public static char getChar(byte[] arr, long off) {
- return UNALIGNED ? UNSAFE.getChar(arr, off) : getCharByByte(arr, off, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getChar(arr, off) : getCharByByte(arr, off, IS_BIG_ENDIAN);
}
/**
@@ -631,7 +631,7 @@ public abstract class GridUnsafe {
* @return Char value from given address.
*/
public static char getChar(long addr) {
- return UNALIGNED ? UNSAFE.getChar(addr) : getCharByByte(addr, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getChar(addr) : getCharByByte(addr, IS_BIG_ENDIAN);
}
/**
@@ -645,7 +645,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putChar(arr, off, val);
} else {
- putCharByByte(arr, off, val, BIG_ENDIAN);
+ putCharByByte(arr, off, val, IS_BIG_ENDIAN);
}
}
@@ -659,7 +659,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putChar(addr, val);
} else {
- putCharByByte(addr, val, BIG_ENDIAN);
+ putCharByByte(addr, val, IS_BIG_ENDIAN);
}
}
@@ -671,7 +671,7 @@ public abstract class GridUnsafe {
* @return Integer value from byte array.
*/
public static int getInt(byte[] arr, long off) {
- return UNALIGNED ? UNSAFE.getInt(arr, off) : getIntByByte(arr, off, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getInt(arr, off) : getIntByByte(arr, off, IS_BIG_ENDIAN);
}
/**
@@ -681,7 +681,7 @@ public abstract class GridUnsafe {
* @return Integer value from given address.
*/
public static int getInt(long addr) {
- return UNALIGNED ? UNSAFE.getInt(addr) : getIntByByte(addr, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getInt(addr) : getIntByByte(addr, IS_BIG_ENDIAN);
}
/**
@@ -695,7 +695,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putInt(arr, off, val);
} else {
- putIntByByte(arr, off, val, BIG_ENDIAN);
+ putIntByByte(arr, off, val, IS_BIG_ENDIAN);
}
}
@@ -709,7 +709,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putInt(addr, val);
} else {
- putIntByByte(addr, val, BIG_ENDIAN);
+ putIntByByte(addr, val, IS_BIG_ENDIAN);
}
}
@@ -721,7 +721,7 @@ public abstract class GridUnsafe {
* @return Long value from byte array.
*/
public static long getLong(byte[] arr, long off) {
- return UNALIGNED ? UNSAFE.getLong(arr, off) : getLongByByte(arr, off, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getLong(arr, off) : getLongByByte(arr, off, IS_BIG_ENDIAN);
}
/**
@@ -731,7 +731,7 @@ public abstract class GridUnsafe {
* @return Long value from given address.
*/
public static long getLong(long addr) {
- return UNALIGNED ? UNSAFE.getLong(addr) : getLongByByte(addr, BIG_ENDIAN);
+ return UNALIGNED ? UNSAFE.getLong(addr) : getLongByByte(addr, IS_BIG_ENDIAN);
}
/**
@@ -745,7 +745,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putLong(arr, off, val);
} else {
- putLongByByte(arr, off, val, BIG_ENDIAN);
+ putLongByByte(arr, off, val, IS_BIG_ENDIAN);
}
}
@@ -759,7 +759,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putLong(addr, val);
} else {
- putLongByByte(addr, val, BIG_ENDIAN);
+ putLongByByte(addr, val, IS_BIG_ENDIAN);
}
}
@@ -771,7 +771,7 @@ public abstract class GridUnsafe {
* @return Float value from byte array.
*/
public static float getFloat(byte[] arr, long off) {
- return UNALIGNED ? UNSAFE.getFloat(arr, off) : Float.intBitsToFloat(getIntByByte(arr, off, BIG_ENDIAN));
+ return UNALIGNED ? UNSAFE.getFloat(arr, off) : Float.intBitsToFloat(getIntByByte(arr, off, IS_BIG_ENDIAN));
}
/**
@@ -781,7 +781,7 @@ public abstract class GridUnsafe {
* @return Float value from given address.
*/
public static float getFloat(long addr) {
- return UNALIGNED ? UNSAFE.getFloat(addr) : Float.intBitsToFloat(getIntByByte(addr, BIG_ENDIAN));
+ return UNALIGNED ? UNSAFE.getFloat(addr) : Float.intBitsToFloat(getIntByByte(addr, IS_BIG_ENDIAN));
}
/**
@@ -795,7 +795,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putFloat(arr, off, val);
} else {
- putIntByByte(arr, off, Float.floatToIntBits(val), BIG_ENDIAN);
+ putIntByByte(arr, off, Float.floatToIntBits(val), IS_BIG_ENDIAN);
}
}
@@ -809,7 +809,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putFloat(addr, val);
} else {
- putIntByByte(addr, Float.floatToIntBits(val), BIG_ENDIAN);
+ putIntByByte(addr, Float.floatToIntBits(val), IS_BIG_ENDIAN);
}
}
@@ -821,7 +821,7 @@ public abstract class GridUnsafe {
* @return Double value from byte array. Alignment aware.
*/
public static double getDouble(byte[] arr, long off) {
- return UNALIGNED ? UNSAFE.getDouble(arr, off) : Double.longBitsToDouble(getLongByByte(arr, off, BIG_ENDIAN));
+ return UNALIGNED ? UNSAFE.getDouble(arr, off) : Double.longBitsToDouble(getLongByByte(arr, off, IS_BIG_ENDIAN));
}
/**
@@ -831,7 +831,7 @@ public abstract class GridUnsafe {
* @return Double value from given address.
*/
public static double getDouble(long addr) {
- return UNALIGNED ? UNSAFE.getDouble(addr) : Double.longBitsToDouble(getLongByByte(addr, BIG_ENDIAN));
+ return UNALIGNED ? UNSAFE.getDouble(addr) : Double.longBitsToDouble(getLongByByte(addr, IS_BIG_ENDIAN));
}
/**
@@ -845,7 +845,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putDouble(arr, off, val);
} else {
- putLongByByte(arr, off, Double.doubleToLongBits(val), BIG_ENDIAN);
+ putLongByByte(arr, off, Double.doubleToLongBits(val), IS_BIG_ENDIAN);
}
}
@@ -859,7 +859,7 @@ public abstract class GridUnsafe {
if (UNALIGNED) {
UNSAFE.putDouble(addr, val);
} else {
- putLongByByte(addr, Double.doubleToLongBits(val), BIG_ENDIAN);
+ putLongByByte(addr, Double.doubleToLongBits(val), IS_BIG_ENDIAN);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
index 3cb0ae1a69..03a7e4f227 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.util.io;
-import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.IS_BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
@@ -274,7 +274,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getShortLittleEndian(buf, off);
@@ -298,7 +298,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getIntLittleEndian(buf, off);
@@ -322,7 +322,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getDoubleLittleEndian(buf, off);
@@ -358,7 +358,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getCharLittleEndian(buf, off);
@@ -382,7 +382,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getLongLittleEndian(buf, off);
@@ -406,7 +406,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getFloatLittleEndian(buf, off);
@@ -495,7 +495,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(2);
- return BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(buf, off) : GridUnsafe.getShort(buf, off);
+ return IS_BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(buf, off) : GridUnsafe.getShort(buf, off);
}
/** {@inheritDoc} */
@@ -511,7 +511,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + this.off;
- char v = BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(buf, off) : GridUnsafe.getChar(buf, off);
+ char v = IS_BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(buf, off) : GridUnsafe.getChar(buf, off);
advanceOffset(2);
@@ -525,7 +525,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(4);
- return BIG_ENDIAN ? GridUnsafe.getIntLittleEndian(buf, off) : GridUnsafe.getInt(buf, off);
+ return IS_BIG_ENDIAN ? GridUnsafe.getIntLittleEndian(buf, off) : GridUnsafe.getInt(buf, off);
}
/** {@inheritDoc} */
@@ -535,7 +535,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
long off = BYTE_ARR_OFF + advanceOffset(8);
- return BIG_ENDIAN ? GridUnsafe.getLongLittleEndian(buf, off) : GridUnsafe.getLong(buf, off);
+ return IS_BIG_ENDIAN ? GridUnsafe.getLongLittleEndian(buf, off) : GridUnsafe.getLong(buf, off);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
index 29e27fd353..f0835aa814 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.util.io;
-import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.IS_BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
@@ -266,7 +266,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
requestFreeSize(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
long off = BYTE_ARR_OFF + this.off;
for (double val : arr) {
@@ -282,7 +282,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
}
private void putInt(int val, long off) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putIntLittleEndian(bytes, off, val);
} else {
GridUnsafe.putInt(bytes, off, val);
@@ -307,7 +307,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
requestFreeSize(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
long off = BYTE_ARR_OFF + this.off;
for (char val : arr) {
@@ -331,7 +331,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
requestFreeSize(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
long off = BYTE_ARR_OFF + this.off;
for (long val : arr) {
@@ -355,7 +355,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
requestFreeSize(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
long off = BYTE_ARR_OFF + this.off;
for (float val : arr) {
@@ -397,7 +397,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
requestFreeSize(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
long off = BYTE_ARR_OFF + this.off;
for (short val : arr) {
@@ -421,7 +421,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
requestFreeSize(bytesToCp);
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
long off = BYTE_ARR_OFF + this.off;
for (int val : arr) {
@@ -481,7 +481,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
}
private void putShort(short val, long off) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putShortLittleEndian(bytes, off, val);
} else {
GridUnsafe.putShort(bytes, off, val);
@@ -497,7 +497,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
long off = BYTE_ARR_OFF + this.off;
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putCharLittleEndian(bytes, off, val);
} else {
GridUnsafe.putChar(bytes, off, val);
@@ -525,7 +525,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
long off = BYTE_ARR_OFF + this.off;
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putLongLittleEndian(bytes, off, v);
} else {
GridUnsafe.putLong(bytes, off, v);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index c6b139875c..64f89700f4 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -26,12 +26,12 @@ import static org.apache.ignite.internal.util.ArrayUtils.FLOAT_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.INT_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.SHORT_ARRAY;
-import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.IS_BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
@@ -204,7 +204,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putShortLittleEndian(heapArr, off, val);
} else {
GridUnsafe.putShort(heapArr, off, val);
@@ -280,7 +280,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putFloatLittleEndian(heapArr, off, val);
} else {
GridUnsafe.putFloat(heapArr, off, val);
@@ -300,7 +300,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putDoubleLittleEndian(heapArr, off, val);
} else {
GridUnsafe.putDouble(heapArr, off, val);
@@ -320,7 +320,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
GridUnsafe.putCharLittleEndian(heapArr, off, val);
} else {
GridUnsafe.putChar(heapArr, off, val);
@@ -368,7 +368,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
@Override
public void writeShortArray(short[] val) {
if (val != null) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
lastFinished = writeArrayLittleEndian(val, SHORT_ARR_OFF, val.length, 2, 1);
} else {
lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
@@ -382,7 +382,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
@Override
public void writeIntArray(int[] val) {
if (val != null) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
lastFinished = writeArrayLittleEndian(val, INT_ARR_OFF, val.length, 4, 2);
} else {
lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
@@ -396,7 +396,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
@Override
public void writeLongArray(long[] val) {
if (val != null) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
lastFinished = writeArrayLittleEndian(val, LONG_ARR_OFF, val.length, 8, 3);
} else {
lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
@@ -410,7 +410,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
@Override
public void writeLongArray(long[] val, int len) {
if (val != null) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
lastFinished = writeArrayLittleEndian(val, LONG_ARR_OFF, len, 8, 3);
} else {
lastFinished = writeArray(val, LONG_ARR_OFF, len, len << 3);
@@ -424,7 +424,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
@Override
public void writeFloatArray(float[] val) {
if (val != null) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
lastFinished = writeArrayLittleEndian(val, FLOAT_ARR_OFF, val.length, 4, 2);
} else {
lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
@@ -438,7 +438,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
@Override
public void writeDoubleArray(double[] val) {
if (val != null) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
lastFinished = writeArrayLittleEndian(val, DOUBLE_ARR_OFF, val.length, 8, 3);
} else {
lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
@@ -452,7 +452,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
@Override
public void writeCharArray(char[] val) {
if (val != null) {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
lastFinished = writeArrayLittleEndian(val, CHAR_ARR_OFF, val.length, 2, 1);
} else {
lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
@@ -807,7 +807,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- return BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(heapArr, off) : GridUnsafe.getShort(heapArr, off);
+ return IS_BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(heapArr, off) : GridUnsafe.getShort(heapArr, off);
} else {
return 0;
}
@@ -903,7 +903,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- return BIG_ENDIAN ? GridUnsafe.getFloatLittleEndian(heapArr, off) : GridUnsafe.getFloat(heapArr, off);
+ return IS_BIG_ENDIAN ? GridUnsafe.getFloatLittleEndian(heapArr, off) : GridUnsafe.getFloat(heapArr, off);
} else {
return 0;
}
@@ -921,7 +921,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- return BIG_ENDIAN ? GridUnsafe.getDoubleLittleEndian(heapArr, off) : GridUnsafe.getDouble(heapArr, off);
+ return IS_BIG_ENDIAN ? GridUnsafe.getDoubleLittleEndian(heapArr, off) : GridUnsafe.getDouble(heapArr, off);
} else {
return 0;
}
@@ -939,7 +939,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long off = baseOff + pos;
- return BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(heapArr, off) : GridUnsafe.getChar(heapArr, off);
+ return IS_BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(heapArr, off) : GridUnsafe.getChar(heapArr, off);
} else {
return 0;
}
@@ -970,7 +970,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override
public short[] readShortArray() {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
return readArrayLittleEndian(SHORT_ARRAY, 2, 1, SHORT_ARR_OFF);
} else {
return readArray(SHORT_ARRAY, 1, SHORT_ARR_OFF);
@@ -980,7 +980,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override
public int[] readIntArray() {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
return readArrayLittleEndian(INT_ARRAY, 4, 2, INT_ARR_OFF);
} else {
return readArray(INT_ARRAY, 2, INT_ARR_OFF);
@@ -990,7 +990,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override
public long[] readLongArray() {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
return readArrayLittleEndian(LONG_ARRAY, 8, 3, LONG_ARR_OFF);
} else {
return readArray(LONG_ARRAY, 3, LONG_ARR_OFF);
@@ -1000,7 +1000,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override
public float[] readFloatArray() {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
return readArrayLittleEndian(FLOAT_ARRAY, 4, 2, FLOAT_ARR_OFF);
} else {
return readArray(FLOAT_ARRAY, 2, FLOAT_ARR_OFF);
@@ -1010,7 +1010,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override
public double[] readDoubleArray() {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
return readArrayLittleEndian(DOUBLE_ARRAY, 8, 3, DOUBLE_ARR_OFF);
} else {
return readArray(DOUBLE_ARRAY, 3, DOUBLE_ARR_OFF);
@@ -1020,7 +1020,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override
public char[] readCharArray() {
- if (BIG_ENDIAN) {
+ if (IS_BIG_ENDIAN) {
return readArrayLittleEndian(CHAR_ARRAY, 2, 1, CHAR_ARR_OFF);
} else {
return readArray(CHAR_ARRAY, 1, CHAR_ARR_OFF);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index b6e7074472..ae57993494 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -30,10 +30,10 @@ import org.jetbrains.annotations.Nullable;
/**
* Multi-versioned partition storage. Maps RowId to a structures called "Version Chains". Each version chain is logically a stack of
* elements with the following structure:
- * <pre><code>[timestamp | txId, row data]</code></pre>
+ * <pre><code>[timestamp | transaction state (txId + commitTableId + commitPartitionId), row data]</code></pre>
*
- * <p>Only the chain's head can contain a transaction id, every other element must have a timestamp. Presence of transaction id indicates
- * that the row is not yet committed.
+ * <p>Only the chain's head can contain a transaction state, every other element must have a timestamp. Presence of transaction state
+ * indicates that the row is not yet committed.
*
* <p>All timestamps in the chain must go in decreasing order, giving us a N2O (newest to oldest) order of search.
*
@@ -111,7 +111,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @throws StorageException If failed to read data from the storage.
*/
@Nullable
- BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException;
+ ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException;
/**
* Reads the value from the storage as it was at the given timestamp.
@@ -120,7 +120,7 @@ public interface MvPartitionStorage extends AutoCloseable {
*/
@Nullable
@Deprecated
- default BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException {
+ default ReadResult read(RowId rowId, Timestamp timestamp) throws StorageException {
return read(rowId, convertTimestamp(timestamp));
}
@@ -132,7 +132,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @return Binary row that corresponds to the key or {@code null} if value is not found.
*/
@Nullable
- BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws StorageException;
+ ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException;
/**
* Creates an uncommitted version, assigning a new row id to it.
@@ -142,28 +142,30 @@ public interface MvPartitionStorage extends AutoCloseable {
* @return Row id.
* @throws StorageException If failed to write data into the storage.
*
- * @deprecated Generates different ids for each replica. {@link #addWrite(RowId, BinaryRow, UUID)} with explicit replicated id must be
- * used instead.
+ * @deprecated Generates different ids for each replica. {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} with explicit replicated
+ * id must be used instead.
*/
@Deprecated
RowId insert(BinaryRow binaryRow, UUID txId) throws StorageException;
/**
- * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id.
- * In details:
- * - if there is no uncommitted version, a new uncommitted version is added
- * - if there is an uncommitted version belonging to the same transaction, it gets replaced by the given version
- * - if there is an uncommitted version belonging to a different transaction, {@link TxIdMismatchException} is thrown
+ * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id. In details: - if there is no
+ * uncommitted version, a new uncommitted version is added - if there is an uncommitted version belonging to the same transaction, it
+ * gets replaced by the given version - if there is an uncommitted version belonging to a different transaction,
+ * {@link TxIdMismatchException} is thrown
*
* @param rowId Row id.
* @param row Binary row to update. Key only row means value removal.
* @param txId Transaction id.
+ * @param commitTableId Commit table id.
+ * @param commitPartitionId Commit partitionId.
* @return Previous uncommitted row version associated with the row id, or {@code null} if no uncommitted version
* exists before this call
* @throws TxIdMismatchException If there's another pending update associated with different transaction id.
* @throws StorageException If failed to write data to the storage.
*/
- @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException, StorageException;
+ @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+ throws TxIdMismatchException, StorageException;
/**
* Aborts a pending update of the ongoing uncommitted transaction. Invoked during rollback.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
new file mode 100644
index 0000000000..ee20c11ad9
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage#read} result.
+ */
+public class ReadResult {
+ /** Unset commit partition id value. */
+ public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
+
+ /** Data. */
+ private final BinaryRow binaryRow;
+
+ /** Transaction id. Not {@code null} iff this is a write-intent. */
+ private final @Nullable UUID transactionId;
+
+ /** Commit table id. Not {@code null} iff this is a write-intent. */
+ private final @Nullable UUID commitTableId;
+
+ /** Commit table id. If this is not a write-intent it is equal to {@link #UNDEFINED_COMMIT_PARTITION_ID}. */
+ private final int commitPartitionId;
+
+ /**
+ * Timestamp of the newest commit of the data. Not {@code null} iff committed version exists, this is a
+ * write-intent and read was made with a timestamp.
+ */
+ private final @Nullable HybridTimestamp newestCommitTs;
+
+ private ReadResult(BinaryRow binaryRow, @Nullable UUID transactionId, @Nullable UUID commitTableId,
+ @Nullable HybridTimestamp newestCommitTs, int commitPartitionId) {
+ this.binaryRow = binaryRow;
+
+ // If transaction is not null, then commitTableId and commitPartitionId should be defined.
+ assert (transactionId == null) || (commitTableId != null && commitPartitionId != -1);
+
+ // If transaction id is null, then commitTableId and commitPartitionId should not be defined.
+ assert (transactionId != null) || (commitTableId == null && commitPartitionId == -1);
+
+ this.transactionId = transactionId;
+ this.commitTableId = commitTableId;
+ this.newestCommitTs = newestCommitTs;
+ this.commitPartitionId = commitPartitionId;
+ }
+
+ public static ReadResult createFromWriteIntent(BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
+ @Nullable HybridTimestamp lastCommittedTimestamp, int commitPartitionId) {
+ return new ReadResult(binaryRow, transactionId, commitTableId, lastCommittedTimestamp, commitPartitionId);
+ }
+
+ public static ReadResult createFromCommitted(BinaryRow binaryRow) {
+ return new ReadResult(binaryRow, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+ }
+
+ /**
+ * Returns binary row representation of the data.
+ *
+ * @return Binary row representation of the data.
+ */
+ public BinaryRow binaryRow() {
+ return binaryRow;
+ }
+
+ /**
+ * Returns transaction id part of the transaction state if this is a write-intent,
+ * {@code null} otherwise.
+ *
+ * @return Transaction id part of the transaction state if this is a write-intent,
+ * {@code null} otherwise.
+ */
+ public @Nullable UUID transactionId() {
+ return transactionId;
+ }
+
+ /**
+ * Returns commit table id part of the transaction state if this is a write-intent,
+ * {@code null} otherwise.
+ *
+ * @return Commit table id part of the transaction state if this is a write-intent,
+ * {@code null} otherwise.
+ */
+ public @Nullable UUID commitTableId() {
+ return commitTableId;
+ }
+
+ /**
+ * Returns timestamp of the most recent commit of the row.
+ *
+ * @return Timestamp of the most recent commit of the row.
+ */
+ public @Nullable HybridTimestamp newestCommitTs() {
+ return newestCommitTs;
+ }
+
+ /**
+ * Returns commit partition id part of the transaction state if this is a write-intent,
+ * {@link #UNDEFINED_COMMIT_PARTITION_ID} otherwise.
+ *
+ * @return Commit partition id part of the transaction state if this is a write-intent,
+ * {@link #UNDEFINED_COMMIT_PARTITION_ID} otherwise.
+ */
+ public int commitPartitionId() {
+ return commitPartitionId;
+ }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 682f913079..1a4f3ee5e2 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -27,10 +27,12 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -44,12 +46,17 @@ import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.Pair;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Base test for MV partition storages.
*/
public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest {
+ /** This mask is used to create a 64-bit unsigned integer from a {@code long}. */
+ private static final BigInteger UNSIGNED_LONG_MASK = BigInteger.ONE.shiftLeft(Long.SIZE).subtract(BigInteger.ONE);
+
/** A partition id that should be used to create a partition instance. */
protected static final int PARTITION_ID = 1;
@@ -65,19 +72,32 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
protected final BinaryRow binaryRow = binaryRow(key, value);
private final TestValue value2 = new TestValue(21, "bar2");
protected final BinaryRow binaryRow2 = binaryRow(key, value2);
+ private final BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));
/**
* Reads a row inside of consistency closure.
*/
protected BinaryRow read(RowId rowId, UUID txId) {
- return storage.runConsistently(() -> storage.read(rowId, txId));
+ ReadResult readResult = storage.runConsistently(() -> storage.read(rowId, txId));
+
+ if (readResult == null) {
+ return null;
+ }
+
+ return readResult.binaryRow();
}
/**
* Reads a row inside of consistency closure.
*/
protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
- return storage.runConsistently(() -> storage.read(rowId, timestamp));
+ ReadResult readResult = storage.runConsistently(() -> storage.read(rowId, timestamp));
+
+ if (readResult == null) {
+ return null;
+ }
+
+ return readResult.binaryRow();
}
/**
@@ -105,7 +125,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
* Adds/updates a write-intent inside of consistency closure.
*/
protected BinaryRow addWrite(RowId rowId, BinaryRow binaryRow, UUID txId) {
- return storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId));
+ return storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId, UUID.randomUUID(), 0));
}
/**
@@ -153,7 +173,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
/**
- * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID)}.
+ * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, int)}.
*/
@Test
public void testAddWrite() {
@@ -165,14 +185,14 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
// Write from the same transaction.
addWrite(rowId, binaryRow, txId);
- // Read without timestamp returns uncommited row.
+ // Read without timestamp returns uncommitted row.
assertRowMatches(read(rowId, txId), binaryRow);
// Read with wrong transaction id should throw exception.
assertThrows(TxIdMismatchException.class, () -> read(rowId, newTransactionId()));
// Read with timestamp returns null.
- assertNull(read(rowId, clock.now()));
+ assertRowMatches(read(rowId, clock.now()), binaryRow);
}
/**
@@ -222,11 +242,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertRowMatches(read(rowId, newTxId), newRow);
assertRowMatches(read(rowId, tsExact), binaryRow);
- assertRowMatches(read(rowId, tsAfter), binaryRow);
- assertRowMatches(read(rowId, clock.now()), binaryRow);
+ assertRowMatches(read(rowId, tsAfter), newRow);
+ assertRowMatches(read(rowId, clock.now()), newRow);
// Only latest time behavior changes after commit.
- commitWrite(rowId, clock.now());
+ HybridTimestamp newRowCommitTs = clock.now();
+ commitWrite(rowId, newRowCommitTs);
assertRowMatches(read(rowId, newTxId), newRow);
@@ -246,8 +267,9 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), binaryRow);
+ assertRowMatches(read(rowId, newRowCommitTs), newRow);
- assertRowMatches(read(rowId, clock.now()), newRow);
+ assertNull(read(rowId, clock.now()));
// Commit remove.
HybridTimestamp removeTs = clock.now();
@@ -490,7 +512,19 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
@Test
- void readByTimestampIgnoresUncommittedVersion() {
+ void readByTimestampAfterWriteFindsUncommittedVersion() {
+ RowId rowId = new RowId(PARTITION_ID);
+
+ addWrite(rowId, binaryRow, newTransactionId());
+
+ HybridTimestamp latestTs = clock.now();
+ BinaryRow foundRow = read(rowId, latestTs);
+
+ assertRowMatches(foundRow, binaryRow);
+ }
+
+ @Test
+ void readByTimestampAfterCommitAndWriteFindsUncommittedVersion() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
@@ -499,7 +533,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
HybridTimestamp latestTs = clock.now();
BinaryRow foundRow = read(rowId, latestTs);
- assertRowMatches(foundRow, binaryRow);
+ assertRowMatches(foundRow, binaryRow2);
}
@Test
@@ -678,9 +712,52 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
RowId rowId = commitAbortAndAddUncommitted();
- BinaryRow foundRow = storage.read(rowId, clock.now());
+ BinaryRow foundRow = storage.read(rowId, clock.now()).binaryRow();
- assertRowMatches(foundRow, binaryRow);
+ // We see the uncommitted row.
+ assertRowMatches(foundRow, binaryRow3);
+ }
+
+ @Test
+ void readByTimestampBeforeAndAfterUncommittedWrite() {
+ RowId rowId = new RowId(PARTITION_ID);
+
+ HybridTimestamp commitTs = clock.now();
+
+ storage.runConsistently(() -> {
+ storage.addWrite(rowId, binaryRow, UUID.randomUUID(), UUID.randomUUID(), 5);
+
+ storage.commitWrite(rowId, commitTs);
+ return null;
+ });
+
+ UUID txId2 = UUID.randomUUID();
+ UUID commitTableId2 = UUID.randomUUID();
+ int commitPartitionId2 = 1338;
+
+ storage.runConsistently(() -> {
+ storage.addWrite(rowId, binaryRow2, txId2, commitTableId2, commitPartitionId2);
+
+ return null;
+ });
+
+ ReadResult res = storage.read(rowId, commitTs);
+
+ assertNotNull(res);
+
+ assertNull(res.transactionId());
+ assertNull(res.commitTableId());
+ assertEquals(-1, res.commitPartitionId());
+ assertRowMatches(res.binaryRow(), binaryRow);
+
+ res = storage.read(rowId, clock.now());
+
+ assertNotNull(res);
+
+ assertEquals(txId2, res.transactionId());
+ assertEquals(commitTableId2, res.commitTableId());
+ assertEquals(commitPartitionId2, res.commitPartitionId());
+ assertRowMatches(res.binaryRow(), binaryRow2);
}
private RowId commitAbortAndAddUncommitted() {
@@ -689,25 +766,24 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
storage.commitWrite(rowId, clock.now());
- storage.addWrite(rowId, binaryRow2, newTransactionId());
+ storage.addWrite(rowId, binaryRow2, newTransactionId(), UUID.randomUUID(), 0);
storage.abortWrite(rowId);
- BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));
-
- storage.addWrite(rowId, binaryRow3, newTransactionId());
+ storage.addWrite(rowId, binaryRow3, newTransactionId(), UUID.randomUUID(), 0);
return rowId;
});
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17720")
void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
commitAbortAndAddUncommitted();
try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) {
BinaryRow foundRow = cursor.next();
- assertRowMatches(foundRow, binaryRow);
+ assertRowMatches(foundRow, binaryRow3);
assertFalse(cursor.hasNext());
}
@@ -719,7 +795,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
BinaryRow foundRow = read(rowId, clock.now());
- assertThat(foundRow, is(nullValue()));
+ assertRowMatches(foundRow, binaryRow);
}
/**
@@ -746,4 +822,184 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertEquals(1, storage.persistedIndex());
}
+
+ @Test
+ void testReadWithinBeforeAndAfterTwoCommits() {
+ HybridTimestamp before = clock.now();
+
+ RowId rowId = new RowId(PARTITION_ID);
+
+ UUID txId = UUID.randomUUID();
+ UUID commitTableId = UUID.randomUUID();
+ int commitPartitionId = 1337;
+
+ HybridTimestamp first = clock.now();
+
+ storage.runConsistently(() -> {
+ storage.addWrite(rowId, binaryRow, txId, commitTableId, commitPartitionId);
+
+ storage.commitWrite(rowId, first);
+ return null;
+ });
+
+ HybridTimestamp betweenCommits = clock.now();
+
+ HybridTimestamp second = clock.now();
+
+ storage.runConsistently(() -> {
+ storage.addWrite(rowId, binaryRow2, UUID.randomUUID(), UUID.randomUUID(), commitPartitionId + 1);
+
+ storage.commitWrite(rowId, second);
+ return null;
+ });
+
+ storage.runConsistently(() -> {
+ storage.addWrite(rowId, binaryRow3, UUID.randomUUID(), UUID.randomUUID(), commitPartitionId + 2);
+
+ return null;
+ });
+
+ HybridTimestamp after = clock.now();
+
+ // Read before commits.
+ ReadResult res = storage.read(rowId, before);
+ assertNull(res);
+
+ // Read at exact time of first commit.
+ res = storage.read(rowId, first);
+
+ assertNotNull(res);
+ assertNull(res.newestCommitTs());
+ assertRowMatches(res.binaryRow(), binaryRow);
+
+ // Read between two commits.
+ res = storage.read(rowId, betweenCommits);
+
+ assertNotNull(res);
+ assertNull(res.newestCommitTs());
+ assertRowMatches(res.binaryRow(), binaryRow);
+
+ // Read at exact time of second commit.
+ res = storage.read(rowId, second);
+
+ assertNotNull(res);
+ assertNull(res.newestCommitTs());
+ assertRowMatches(res.binaryRow(), binaryRow2);
+
+ // Read after second commit (write intent).
+ res = storage.read(rowId, after);
+
+ assertNotNull(res);
+ assertNotNull(res.newestCommitTs());
+ assertEquals(second, res.newestCommitTs());
+ assertRowMatches(res.binaryRow(), binaryRow3);
+ }
+
+ @Test
+ void testWrongPartition() {
+ RowId rowId = commitAbortAndAddUncommitted();
+
+ var row = new RowId(rowId.partitionId() + 1, rowId.mostSignificantBits(), rowId.leastSignificantBits());
+
+ assertNull(read(row, clock.now()));
+ }
+
+ @Test
+ void testReadingNothingWithLowerRowIdIfHigherRowIdWritesExist() {
+ RowId rowId = commitAbortAndAddUncommitted();
+
+ RowId lowerRowId = getPreviousRowId(rowId);
+
+ assertNull(read(lowerRowId, clock.now()));
+ }
+
+ @Test
+ void testReadingNothingByTxIdWithLowerRowId() {
+ RowId higherRowId = new RowId(PARTITION_ID);
+ RowId lowerRowId = getPreviousRowId(higherRowId);
+
+ UUID txId = UUID.randomUUID();
+
+ storage.runConsistently(() -> {
+ UUID tableId = UUID.randomUUID();
+
+ storage.addWrite(higherRowId, binaryRow, txId, tableId, PARTITION_ID);
+
+ return null;
+ });
+
+ assertNull(read(lowerRowId, txId));
+ }
+
+ @Test
+ void testReadingCorrectWriteIntentByTimestampIfLowerRowIdWriteIntentExists() {
+ RowId higherRowId = new RowId(PARTITION_ID);
+ RowId lowerRowId = getPreviousRowId(higherRowId);
+
+ UUID txId = UUID.randomUUID();
+
+ storage.runConsistently(() -> {
+ UUID tableId = UUID.randomUUID();
+
+ storage.addWrite(lowerRowId, binaryRow2, UUID.randomUUID(), UUID.randomUUID(), PARTITION_ID);
+ storage.addWrite(higherRowId, binaryRow, txId, tableId, PARTITION_ID);
+
+ storage.commitWrite(higherRowId, clock.now());
+
+ return null;
+ });
+
+ assertRowMatches(read(higherRowId, clock.now()), binaryRow);
+ }
+
+ @Test
+ void testReadingCorrectWriteIntentByTimestampIfHigherRowIdWriteIntentExists() {
+ RowId higherRowId = new RowId(PARTITION_ID);
+ RowId lowerRowId = getPreviousRowId(higherRowId);
+
+ storage.runConsistently(() -> {
+ UUID txId = UUID.randomUUID();
+ UUID tableId = UUID.randomUUID();
+
+ storage.addWrite(lowerRowId, binaryRow, txId, tableId, PARTITION_ID);
+ storage.addWrite(higherRowId, binaryRow2, txId, tableId, PARTITION_ID);
+
+ return null;
+ });
+
+ assertRowMatches(read(lowerRowId, clock.now()), binaryRow);
+ }
+
+ /**
+ * Returns row id that is lexicographically smaller (by the value of one) than the argument.
+ *
+ * @param value Row id.
+ * @return Row id value minus 1.
+ */
+ private RowId getPreviousRowId(RowId value) {
+ Pair<Long, Long> previous128Uint = getPrevious128Uint(value.mostSignificantBits(), value.leastSignificantBits());
+
+ return new RowId(value.partitionId(), previous128Uint.getFirst(), previous128Uint.getSecond());
+ }
+
+ /**
+ * Performs a decrement operation on a 128-bit unsigned value that is represented by two longs.
+ *
+ * @param msb Most significant bytes of 128-bit unsigned integer.
+ * @param lsb Least significant bytes of 128-bit unsigned integer.
+ * @return Less by one value.
+ */
+ private Pair<Long, Long> getPrevious128Uint(long msb, long lsb) {
+ BigInteger unsignedMsb = BigInteger.valueOf(msb).and(UNSIGNED_LONG_MASK);
+ BigInteger unsignedLsb = BigInteger.valueOf(lsb).and(UNSIGNED_LONG_MASK);
+
+ BigInteger biRepresentation = unsignedMsb.shiftLeft(Long.SIZE).add(unsignedLsb);
+
+ BigInteger lesserBi = biRepresentation.subtract(BigInteger.ONE);
+
+ long newMsb = lesserBi.shiftRight(Long.SIZE).and(UNSIGNED_LONG_MASK).longValue();
+ long newLsb = lesserBi.and(UNSIGNED_LONG_MASK).longValue();
+
+ return new Pair<>(newMsb, newLsb);
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 5efaee4e95..74114db134 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -120,7 +120,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData0, txId));
- assertThat(unwrap(partitionStorage0.read(rowId0, txId)), is(equalTo(unwrap(testData0))));
+ assertThat(unwrap(partitionStorage0.read(rowId0, txId).binaryRow()), is(equalTo(unwrap(testData0))));
assertThat(partitionStorage1.read(rowId0, txId), is(nullValue()));
var testData1 = binaryRow(new TestKey(2, "2"), new TestValue(20, "20"));
@@ -128,7 +128,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData1, txId));
assertThat(partitionStorage0.read(rowId1, txId), is(nullValue()));
- assertThat(unwrap(partitionStorage1.read(rowId1, txId)), is(equalTo(unwrap(testData1))));
+ assertThat(unwrap(partitionStorage1.read(rowId1, txId).binaryRow()), is(equalTo(unwrap(testData1))));
assertThat(toList(partitionStorage0.scan(row -> true, txId)), contains(unwrap(testData0)));
assertThat(toList(partitionStorage1.scan(row -> true, txId)), contains(unwrap(testData1)));
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 7ebd2c91c0..f7d99993f1 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -29,10 +29,12 @@ import java.util.function.Predicate;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -51,27 +53,34 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
private static class VersionChain {
final @Nullable BinaryRow row;
- final @Nullable HybridTimestamp begin;
+ final @Nullable HybridTimestamp ts;
final @Nullable UUID txId;
+ final @Nullable UUID commitTableId;
+ final int commitPartitionId;
final @Nullable VersionChain next;
- VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp begin, @Nullable UUID txId, @Nullable VersionChain next) {
+ VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp ts, @Nullable UUID txId, @Nullable UUID commitTableId,
+ int commitPartitionId, @Nullable VersionChain next) {
this.row = row;
- this.begin = begin;
+ this.ts = ts;
this.txId = txId;
+ this.commitTableId = commitTableId;
+ this.commitPartitionId = commitPartitionId;
this.next = next;
}
- static VersionChain createUncommitted(@Nullable BinaryRow row, @Nullable UUID txId, @Nullable VersionChain next) {
- return new VersionChain(row, null, txId, next);
+ static VersionChain forWriteIntent(@Nullable BinaryRow row, @Nullable UUID txId, @Nullable UUID commitTableId,
+ int commitPartitionId, @Nullable VersionChain next) {
+ return new VersionChain(row, null, txId, commitTableId, commitPartitionId, next);
}
- static VersionChain createCommitted(@Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
- return new VersionChain(uncommittedVersionChain.row, timestamp, null, uncommittedVersionChain.next);
+ static VersionChain forCommitted(@Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
+ return new VersionChain(uncommittedVersionChain.row, timestamp, null, null,
+ ReadResult.UNDEFINED_COMMIT_PARTITION_ID, uncommittedVersionChain.next);
}
- boolean notContainsWriteIntent() {
- return begin != null && txId == null;
+ boolean notWriteIntent() {
+ return ts != null && txId == null;
}
}
@@ -112,28 +121,29 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
public RowId insert(BinaryRow row, UUID txId) throws StorageException {
RowId rowId = new RowId(partitionId);
- addWrite(rowId, row, txId);
+ addWrite(rowId, row, txId, UUID.randomUUID(), 0);
return rowId;
}
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException {
+ public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+ throws TxIdMismatchException {
BinaryRow[] res = {null};
map.compute(rowId, (ignored, versionChain) -> {
- if (versionChain != null && versionChain.begin == null) {
+ if (versionChain != null && versionChain.ts == null) {
if (!txId.equals(versionChain.txId)) {
throw new TxIdMismatchException(txId, versionChain.txId);
}
res[0] = versionChain.row;
- return VersionChain.createUncommitted(row, txId, versionChain.next);
+ return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain.next);
}
- return VersionChain.createUncommitted(row, txId, versionChain);
+ return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain);
});
return res[0];
@@ -145,11 +155,11 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
BinaryRow[] res = {null};
map.computeIfPresent(rowId, (ignored, versionChain) -> {
- if (versionChain.notContainsWriteIntent()) {
+ if (versionChain.notWriteIntent()) {
return versionChain;
}
- assert versionChain.begin == null;
+ assert versionChain.ts == null;
res[0] = versionChain.row;
@@ -165,17 +175,17 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
map.compute(rowId, (ignored, versionChain) -> {
assert versionChain != null;
- if (versionChain.notContainsWriteIntent()) {
+ if (versionChain.notWriteIntent()) {
return versionChain;
}
- return VersionChain.createCommitted(timestamp, versionChain);
+ return VersionChain.forCommitted(timestamp, versionChain);
});
}
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+ public @Nullable ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
VersionChain versionChain = map.get(rowId);
return read(versionChain, null, txId, null);
@@ -183,14 +193,14 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp timestamp) {
+ public @Nullable ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
VersionChain versionChain = map.get(rowId);
return read(versionChain, timestamp, null, null);
}
@Nullable
- private static BinaryRow read(
+ private static ReadResult read(
VersionChain versionChain,
@Nullable HybridTimestamp timestamp,
@Nullable UUID txId,
@@ -213,27 +223,85 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
throw new TxIdMismatchException(txId, versionChain.txId);
}
- return binaryRow;
+ boolean isWriteIntent = versionChain.ts == null;
+
+ if (isWriteIntent) {
+ return ReadResult.createFromWriteIntent(
+ binaryRow,
+ versionChain.txId,
+ versionChain.commitTableId,
+ versionChain.next != null ? versionChain.next.ts : null,
+ versionChain.commitPartitionId
+ );
+ }
+
+ return ReadResult.createFromCommitted(binaryRow);
}
VersionChain cur = versionChain;
- if (cur.begin == null) {
+ boolean hasWriteIntent = false;
+
+ if (cur.ts == null) {
+ hasWriteIntent = true;
+
+ if (cur.next == null) {
+ // We only have a write-intent.
+ BinaryRow binaryRow = cur.row;
+
+ if (filter != null && !filter.test(binaryRow)) {
+ return null;
+ }
+
+ return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, null,
+ cur.commitPartitionId);
+ }
+
cur = cur.next;
}
+ return walkVersionChain(versionChain, timestamp, filter, cur, hasWriteIntent);
+ }
+
+ @Nullable
+ private static ReadResult walkVersionChain(VersionChain chainHead, @NotNull HybridTimestamp timestamp,
+ @Nullable Predicate<BinaryRow> filter, VersionChain firstCommit, boolean hasWriteIntent) {
+ boolean isLatestCommit = true;
+
+ VersionChain cur = firstCommit;
+
while (cur != null) {
- if (timestamp.compareTo(cur.begin) >= 0) {
+ int compareResult = timestamp.compareTo(cur.ts);
+
+ if (compareResult >= 0) {
+ if (isLatestCommit && hasWriteIntent && (compareResult != 0)) {
+ // It's the latest commit in chain, query ts is greater than commit ts and there is a write-intent.
+ // So we just return write-intent.
+ BinaryRow binaryRow = chainHead.row;
+
+ if (filter != null && !filter.test(binaryRow)) {
+ return null;
+ }
+
+ HybridTimestamp latestCommitTs = chainHead.next != null ? chainHead.next.ts : null;
+
+ return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, latestCommitTs,
+ chainHead.commitPartitionId);
+ }
+
+ // This commit has exactly the query ts, meaning that commit is the one we are looking for.
BinaryRow binaryRow = cur.row;
if (filter != null && !filter.test(binaryRow)) {
return null;
}
- return binaryRow;
+ return ReadResult.createFromCommitted(binaryRow);
}
cur = cur.next;
+
+ isLatestCommit = false;
}
return null;
@@ -245,6 +313,7 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
Iterator<BinaryRow> iterator = map.values().stream()
.map(versionChain -> read(versionChain, null, txId, filter))
.filter(Objects::nonNull)
+ .map(ReadResult::binaryRow)
.iterator();
return Cursor.fromIterator(iterator);
@@ -256,6 +325,7 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
Iterator<BinaryRow> iterator = map.values().stream()
.map(versionChain -> read(versionChain, timestamp, null, filter))
.filter(Objects::nonNull)
+ .map(ReadResult::binaryRow)
.iterator();
return Cursor.fromIterator(iterator);
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 9deb69241f..ed17aea627 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -201,7 +202,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+ public @Nullable ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+ if (rowId.partitionId() != partitionId) {
+ return null;
+ }
+
VersionChain versionChain = findVersionChain(rowId);
if (versionChain == null) {
@@ -213,7 +218,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+ public @Nullable ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+ if (rowId.partitionId() != partitionId) {
+ return null;
+ }
+
VersionChain versionChain = findVersionChain(rowId);
if (versionChain == null) {
@@ -231,7 +240,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- private @Nullable ByteBufferRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+ private @Nullable ReadResult findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
@@ -240,9 +249,23 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
return null;
}
- throwIfChainBelongsToAnotherTx(versionChain, txId);
+ if (versionChain.isUncommitted()) {
+ UUID chainTxId = versionChain.transactionId();
+
+ assert chainTxId != null;
+
+ throwIfChainBelongsToAnotherTx(versionChain, txId);
+
+ return ReadResult.createFromWriteIntent(
+ row,
+ versionChain.transactionId(),
+ versionChain.commitTableId(),
+ null,
+ versionChain.commitPartitionId()
+ );
+ }
- return row;
+ return ReadResult.createFromCommitted(row);
}
private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -258,7 +281,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
private void throwIfChainBelongsToAnotherTx(VersionChain versionChain, UUID txId) {
- if (versionChain.transactionId() != null && !txId.equals(versionChain.transactionId())) {
+ assert versionChain.isUncommitted();
+
+ if (!txId.equals(versionChain.transactionId())) {
throw new TxIdMismatchException(txId, versionChain.transactionId());
}
}
@@ -271,7 +296,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
return new ByteBufferRow(rowVersion.value());
}
- private @Nullable ByteBufferRow findRowVersionInChain(
+ private @Nullable BinaryRow findRowVersionInChain(
VersionChain versionChain,
@Nullable UUID transactionId,
@Nullable HybridTimestamp timestamp,
@@ -280,38 +305,65 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
assert transactionId != null ^ timestamp != null;
if (transactionId != null) {
- return findLatestRowVersion(versionChain, transactionId, keyFilter);
+ ReadResult res = findLatestRowVersion(versionChain, transactionId, keyFilter);
+
+ if (res == null) {
+ return null;
+ }
+
+ return res.binaryRow();
} else {
- ByteBufferRow row = findRowVersionByTimestamp(versionChain, timestamp);
+ ReadResult res = findRowVersionByTimestamp(versionChain, timestamp);
+
+ if (res == null) {
+ return null;
+ }
+
+ BinaryRow row = res.binaryRow();
return keyFilter.test(row) ? row : null;
}
}
- private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
- if (!versionChain.hasCommittedVersions()) {
- return null;
- }
-
- long newestCommittedLink = versionChain.newestCommittedLink();
+ private @Nullable ReadResult findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
+ long headLink = versionChain.headLink();
ScanVersionChainByTimestamp scanByTimestamp = new ScanVersionChainByTimestamp(partitionId);
try {
- rowVersionDataPageReader.traverse(newestCommittedLink, scanByTimestamp, timestamp);
+ rowVersionDataPageReader.traverse(headLink, scanByTimestamp, timestamp);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Cannot search for a row version", e);
}
- return scanByTimestamp.result();
+ ByteBufferRow resultRow = scanByTimestamp.result();
+
+ if (resultRow == null) {
+ return null;
+ }
+
+ if (scanByTimestamp.isResultWriteIntent()) {
+ // If we returned write-intent, then version chain must be uncommitted.
+ assert versionChain.isUncommitted();
+
+ UUID transactionId = versionChain.transactionId();
+ UUID commitTableId = versionChain.commitTableId();
+ int commitPartitionId = versionChain.commitPartitionId();
+
+ return ReadResult.createFromWriteIntent(resultRow, transactionId, commitTableId, scanByTimestamp.lastCommittedTimestamp(),
+ commitPartitionId);
+ }
+
+ return ReadResult.createFromCommitted(resultRow);
}
/** {@inheritDoc} */
+ @Deprecated
@Override
public RowId insert(BinaryRow row, UUID txId) throws StorageException {
RowId rowId = new RowId(partitionId);
- addWrite(rowId, row, txId);
+ addWrite(rowId, row, txId, UUID.randomUUID(), 0);
return rowId;
}
@@ -337,21 +389,26 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException, StorageException {
+ public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+ throws TxIdMismatchException, StorageException {
+ assert rowId.partitionId() == partitionId : rowId;
VersionChain currentChain = findVersionChain(rowId);
if (currentChain == null) {
RowVersion newVersion = insertRowVersion(row, NULL_LINK);
- VersionChain versionChain = new VersionChain(rowId, txId, newVersion.link(), NULL_LINK);
+ VersionChain versionChain = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, newVersion.link(),
+ NULL_LINK);
updateVersionChain(versionChain);
return null;
}
- throwIfChainBelongsToAnotherTx(currentChain, txId);
+ if (currentChain.isUncommitted()) {
+ throwIfChainBelongsToAnotherTx(currentChain, txId);
+ }
RowVersion newVersion = insertRowVersion(row, currentChain.newestCommittedLink());
@@ -366,7 +423,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
removeRowVersion(currentVersion);
}
- VersionChain chainReplacement = new VersionChain(rowId, txId, newVersion.link(), newVersion.nextLink());
+ VersionChain chainReplacement = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, newVersion.link(),
+ newVersion.nextLink());
updateVersionChain(chainReplacement);
@@ -376,6 +434,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
/** {@inheritDoc} */
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
+ assert rowId.partitionId() == partitionId : rowId;
+
VersionChain currentVersionChain = findVersionChain(rowId);
if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
@@ -393,7 +453,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
// Next can be safely replaced with any value (like 0), because this field is only used when there
// is some uncommitted value, but when we add an uncommitted value, we 'fix' such placeholder value
// (like 0) by replacing it with a valid value.
- VersionChain versionChainReplacement = new VersionChain(rowId, null, latestVersion.nextLink(), NULL_LINK);
+ VersionChain versionChainReplacement = VersionChain.createCommitted(rowId, latestVersion.nextLink(), NULL_LINK);
updateVersionChain(versionChainReplacement);
} else {
@@ -415,6 +475,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
/** {@inheritDoc} */
@Override
public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+ assert rowId.partitionId() == partitionId : rowId;
+
VersionChain currentVersionChain = findVersionChain(rowId);
if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
@@ -431,9 +493,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
try {
- VersionChain updatedVersionChain = new VersionChain(
+ VersionChain updatedVersionChain = VersionChain.createCommitted(
currentVersionChain.rowId(),
- null,
currentVersionChain.headLink(),
currentVersionChain.nextLink()
);
@@ -564,7 +625,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
VersionChain chain = getCurrentChainFromTreeCursor();
- ByteBufferRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
+ BinaryRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
if (row != null) {
nextRow = row;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 60425a0402..4c9fed0db5 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -48,7 +48,7 @@ public final class RowVersion implements Storable {
private long link;
- private final @Nullable HybridTimestamp timestamp;
+ private final @Nullable HybridTimestamp timestamp; // +INF
private final long nextLink;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
index 7d5fd1616e..c23404c56a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
@@ -36,16 +36,26 @@ import org.jetbrains.annotations.Nullable;
class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp> {
private final int partitionId;
+ private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+
/** Contains the result when the traversal ends. */
private @Nullable ByteBufferRow result;
/**
* First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink);
- * then it's {@code false} (when we found the version we need and we read its value).
+ * then it's {@code false} (when we found the version we need, and we read its value).
*/
private boolean lookingForVersion = true;
- private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+ private boolean hasCommittedRow;
+
+ private boolean hasWriteIntent = false;
+
+ private long writeIntentLink;
+
+ private boolean returnWriteIntent = false;
+
+ private HybridTimestamp lastCommittedTimestamp;
ScanVersionChainByTimestamp(int partitionId) {
this.partitionId = partitionId;
@@ -57,13 +67,57 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
if (lookingForVersion) {
HybridTimestamp rowVersionTs = HybridTimestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET);
- if (rowTimestampMatches(rowVersionTs, timestamp)) {
- return readFullyOrStartReadingFragmented(link, pageAddr, payload);
- } else {
- return advanceToNextVersion(pageAddr, payload);
+ boolean isWriteIntent = rowVersionTs == null;
+
+ if (!hasWriteIntent && isWriteIntent) {
+ hasWriteIntent = true;
+ writeIntentLink = link;
}
+
+ if (rowVersionTs != null) {
+ boolean isFirstCommittedRow = false;
+
+ if (!hasCommittedRow) {
+ hasCommittedRow = true;
+ isFirstCommittedRow = true;
+ lastCommittedTimestamp = rowVersionTs;
+ }
+
+ int compare = rowVersionTs.compareTo(timestamp);
+
+ if (compare == 0) {
+ // This is exactly the row that we are looking for.
+ return readFullyOrStartReadingFragmented(link, pageAddr, payload);
+ }
+
+ if (compare < 0) {
+ // This row is older than timestamp we are looking for.
+ if (hasWriteIntent && isFirstCommittedRow) {
+ // If this is the first committed row, but we have write-intent, return write-intent.
+ lookingForVersion = false;
+ returnWriteIntent = true;
+
+ return writeIntentLink;
+ }
+
+ // We are in range between older and newer row, return older row.
+ return readFullyOrStartReadingFragmented(link, pageAddr, payload);
+ }
+ }
+
+ long nextVersionLink = nextVersionLink(pageAddr, payload);
+
+ if (nextVersionLink == STOP_TRAVERSAL && isWriteIntent) {
+ // We have no committed rows except write-intent, so return write-intent.
+ lookingForVersion = false;
+ returnWriteIntent = true;
+
+ return writeIntentLink;
+ }
+
+ return nextVersionLink;
} else {
- // We are continuing reading a fragmented row.
+ // We continue reading a fragmented row.
return readNextFragment(link, pageAddr, payload);
}
}
@@ -78,7 +132,7 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
}
- private long advanceToNextVersion(long pageAddr, DataPagePayload payload) {
+ private long nextVersionLink(long pageAddr, DataPagePayload payload) {
long nextLink = readPartitionlessLink(partitionId, pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET);
if (nextLink == NULL_LINK) {
@@ -96,7 +150,7 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
@Override
public void finish() {
if (lookingForVersion) {
- // we never found the version -> we hever tried to read its value AND the result is null
+ // We never found the version -> we never tried to read its value AND the result is null.
result = null;
return;
}
@@ -116,6 +170,14 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
return result;
}
+ boolean isResultWriteIntent() {
+ return returnWriteIntent;
+ }
+
+ HybridTimestamp lastCommittedTimestamp() {
+ return lastCommittedTimestamp;
+ }
+
void reset() {
result = null;
lookingForVersion = true;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
index c650bed4dc..3e600a16c5 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import java.util.UUID;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
@@ -37,19 +38,35 @@ public class VersionChain extends VersionChainKey {
/** Link to the most recent version. */
private final long headLink;
- /** Link to the newest committed {@link RowVersion} if head is not yet committed, or {@link RowVersion#NULL_LINK} otherwise. */
+ /** Link to the newest committed {@link RowVersion} if head is not yet committed, or {@link PageIdUtils#NULL_LINK} otherwise. */
private final long nextLink;
+ private final @Nullable UUID commitTableId;
+
+ private final int commitPartitionId;
+
/**
* Constructor.
*/
- public VersionChain(RowId rowId, @Nullable UUID transactionId, long headLink, long nextLink) {
+ private VersionChain(RowId rowId, @Nullable UUID transactionId, @Nullable UUID commitTableId, int commitPartitionId, long headLink,
+ long nextLink) {
super(rowId);
this.transactionId = transactionId;
+ this.commitTableId = commitTableId;
+ this.commitPartitionId = commitPartitionId;
this.headLink = headLink;
this.nextLink = nextLink;
}
+ public static VersionChain createCommitted(RowId rowId, long headLink, long nextLink) {
+ return new VersionChain(rowId, null, null, -1, headLink, nextLink);
+ }
+
+ public static VersionChain createUncommitted(RowId rowId, UUID transactionId, UUID commitTableId, int commitPartitionId, long headLink,
+ long nextLink) {
+ return new VersionChain(rowId, transactionId, commitTableId, commitPartitionId, headLink, nextLink);
+ }
+
/**
* Returns a transaction id, associated with a chain's head, or {@code null} if head is already committed.
*/
@@ -57,6 +74,14 @@ public class VersionChain extends VersionChainKey {
return transactionId;
}
+ public @Nullable UUID commitTableId() {
+ return commitTableId;
+ }
+
+ public int commitPartitionId() {
+ return commitPartitionId;
+ }
+
/**
* Returns a link to the newest {@link RowVersion} in the chain.
*/
@@ -65,7 +90,7 @@ public class VersionChain extends VersionChainKey {
}
/**
- * Returns a link to the newest committed {@link RowVersion} if head is not yet committed, or {@link RowVersion#NULL_LINK} otherwise.
+ * Returns a link to the newest committed {@link RowVersion} if head is not yet committed, or {@link PageIdUtils#NULL_LINK} otherwise.
*
* @see #isUncommitted()
* @see #newestCommittedLink()
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
index 7e5fcbe202..3215fec385 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.storage.pagememory.mv.io;
import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getShort;
import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionlessLink;
import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionlessLink;
@@ -36,6 +38,11 @@ import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
* Interface for VersionChain B+Tree-related IO. Defines a following data layout:
* <pre><code>[rowId's UUID (16 bytes), txId (16 bytes), head link (6 bytes), next link (6 bytes)]</code></pre>
*/
+// RowId, link, (next lint?), Nullable txId - прямо в страницах дерева
+// |
+// v
+//
+
public interface VersionChainIo {
/** Offset of rowId's most significant bits, 8 bytes. */
int ROW_ID_MSB_OFFSET = 0;
@@ -49,8 +56,14 @@ public interface VersionChainIo {
/** Offset of txId's least significant bits, 8 bytes. */
int TX_ID_LSB_OFFSET = TX_ID_MSB_OFFSET + Long.BYTES;
+ int COMMIT_TABLE_ID_MSB_OFFSET = TX_ID_LSB_OFFSET + Long.BYTES;
+
+ int COMMIT_TABLE_ID_LSB_OFFSET = COMMIT_TABLE_ID_MSB_OFFSET + Long.BYTES;
+
+ int COMMIT_PARTITION_ID_OFFSET = COMMIT_TABLE_ID_LSB_OFFSET + Long.BYTES;
+
/** Offset of partitionless head link, 6 bytes. */
- int HEAD_LINK_OFFSET = TX_ID_LSB_OFFSET + Long.BYTES;
+ int HEAD_LINK_OFFSET = COMMIT_PARTITION_ID_OFFSET + Short.BYTES;
/** Offset of partitionless next link, 6 bytes. */
int NEXT_LINK_OFFSET = HEAD_LINK_OFFSET + PARTITIONLESS_LINK_SIZE_BYTES;
@@ -93,13 +106,25 @@ public interface VersionChainIo {
putLong(pageAddr, off + ROW_ID_LSB_OFFSET, rowId.leastSignificantBits());
UUID txId = row.transactionId();
+ UUID commitTableId = row.commitTableId();
+ int commitPartitionId = row.commitPartitionId();
if (txId == null) {
+ assert commitTableId == null;
+ assert commitPartitionId == -1;
+
putLong(pageAddr, off + TX_ID_MSB_OFFSET, NULL_UUID_COMPONENT);
putLong(pageAddr, off + TX_ID_LSB_OFFSET, NULL_UUID_COMPONENT);
} else {
+ assert commitTableId != null;
+ assert commitPartitionId >= 0;
+
putLong(pageAddr, off + TX_ID_MSB_OFFSET, txId.getMostSignificantBits());
putLong(pageAddr, off + TX_ID_LSB_OFFSET, txId.getLeastSignificantBits());
+
+ putLong(pageAddr, off + COMMIT_TABLE_ID_MSB_OFFSET, commitTableId.getMostSignificantBits());
+ putLong(pageAddr, off + COMMIT_TABLE_ID_LSB_OFFSET, commitTableId.getLeastSignificantBits());
+ putShort(pageAddr, off + COMMIT_PARTITION_ID_OFFSET, (short) commitPartitionId);
}
writePartitionlessLink(pageAddr + off + HEAD_LINK_OFFSET, row.headLink());
@@ -153,6 +178,17 @@ public interface VersionChainIo {
long headLink = readPartitionlessLink(partitionId, pageAddr, offset + HEAD_LINK_OFFSET);
long nextLink = readPartitionlessLink(partitionId, pageAddr, offset + NEXT_LINK_OFFSET);
- return new VersionChain(rowId, txId, headLink, nextLink);
+ if (txId != null) {
+ long commitTblIdMsb = getLong(pageAddr, offset + COMMIT_TABLE_ID_MSB_OFFSET);
+ long commitTblIdLsb = getLong(pageAddr, offset + COMMIT_TABLE_ID_LSB_OFFSET);
+
+ UUID commitTableId = new UUID(commitTblIdMsb, commitTblIdLsb);
+
+ int commitPartitionId = getShort(pageAddr, offset + COMMIT_PARTITION_ID_OFFSET) & 0xFFFF;
+
+ return VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, headLink, nextLink);
+ }
+
+ return VersionChain.createCommitted(rowId, headLink, nextLink);
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 327dfa26c0..378379f426 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -38,6 +38,7 @@ import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -57,13 +58,27 @@ import org.rocksdb.WriteBatchWithIndex;
import org.rocksdb.WriteOptions;
/**
- * Multi-versioned partition storage implementation based on RocksDB. Stored data has the following format:
+ * Multi-versioned partition storage implementation based on RocksDB. Stored data has the following format.
+ *
+ * <p/>Key:
* <pre><code>
- * | partId (2 bytes, BE) | rowId (16 bytes, BE) |</code></pre>
- * or
+ * For write-intents
+ * | partId (2 bytes, BE) | rowId (16 bytes, BE) |
+ *
+ * For committed rows
+ * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC) |
+ * </code></pre>
+ * Value:
* <pre><code>
- * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC) |</code></pre>
- * depending on transaction status. Pending transactions data doesn't have a timestamp assigned.
+ * For write-intents
+ * | txId (16 bytes) | commitTableId (16 bytes) | commitPartitionId (2 bytes) | Row data |
+ *
+ * For committed rows
+ * | Row data |
+ * </code></pre>
+ *
+ * <p/>Pending transactions (write-intents) data doesn't have a timestamp assigned, but they have transaction
+ * state (txId, commitTableId and commitPartitionId).
*
* <p/>BE means Big Endian, meaning that lexicographical bytes order matches a natural order of partitions.
*
@@ -80,9 +95,30 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Size of the key without timestamp. */
private static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
- /** Transaction id size. */
+ /** Transaction id size (part of the transaction state). */
private static final int TX_ID_SIZE = 2 * Long.BYTES;
+ /** Commit table id size (part of the transaction state). */
+ private static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+ /** Commit partition id size (part of the transaction state). */
+ private static final int PARTITION_ID_SIZE = Short.BYTES;
+
+ /** Size of the value header (transaction state). */
+ private static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + PARTITION_ID_SIZE;
+
+ /** Transaction id offset. */
+ private static final int TX_ID_OFFSET = 0;
+
+ /** Commit table id offset. */
+ private static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+ /** Commit partition id offset. */
+ private static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+ /** Value offset (if transaction state is present). */
+ private static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
/** Maximum size of the key. */
private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
@@ -272,7 +308,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
try {
- writeUnversioned(keyBuf.array(), row, txId);
+ writeUnversioned(keyBuf.array(), row, txId, UUID.randomUUID(), partitionId);
} catch (RocksDBException e) {
throw new StorageException("Failed to insert new row into storage", e);
}
@@ -282,7 +318,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId)
+ public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
throws TxIdMismatchException, StorageException {
WriteBatchWithIndex writeBatch = requireWriteBatch();
@@ -309,16 +345,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// Write empty value as a tombstone.
if (previousValue != null) {
// Reuse old array with transaction id already written to it.
- writeBatch.put(cf, keyBytes, copyOf(previousValue, TX_ID_SIZE));
+ writeBatch.put(cf, keyBytes, copyOf(previousValue, VALUE_HEADER_SIZE));
} else {
- byte[] txIdBytes = new byte[TX_ID_SIZE];
+ byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
- putTransactionId(txIdBytes, 0, txId);
+ putUuid(valueHeaderBytes, TX_ID_OFFSET, txId);
+ putUuid(valueHeaderBytes, TABLE_ID_OFFSET, commitTableId);
+ putShort(valueHeaderBytes, PARTITION_ID_OFFSET, (short) commitPartitionId);
- writeBatch.put(cf, keyBytes, txIdBytes);
+ writeBatch.put(cf, keyBytes, valueHeaderBytes);
}
} else {
- writeUnversioned(keyBufArray, row, txId);
+ writeUnversioned(keyBufArray, row, txId, commitTableId, commitPartitionId);
}
} catch (RocksDBException e) {
throw new StorageException("Failed to update a row in storage", e);
@@ -335,17 +373,21 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* @param txId Transaction id.
* @throws RocksDBException If write failed.
*/
- private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId) throws RocksDBException {
+ private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+ throws RocksDBException {
WriteBatchWithIndex writeBatch = requireWriteBatch();
//TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
byte[] rowBytes = row.bytes();
- ByteBuffer value = ByteBuffer.allocate(rowBytes.length + TX_ID_SIZE).order(LITTLE_ENDIAN);
+ ByteBuffer value = ByteBuffer.allocate(rowBytes.length + VALUE_HEADER_SIZE);
+ byte[] array = value.array();
- putTransactionId(value.array(), 0, txId);
+ putUuid(array, TX_ID_OFFSET, txId);
+ putUuid(array, TABLE_ID_OFFSET, commitTableId);
+ putShort(array, PARTITION_ID_OFFSET, (short) commitPartitionId);
- value.position(TX_ID_SIZE).put(rowBytes);
+ value.position(VALUE_OFFSET).put(rowBytes);
// Write binary row data as a value.
writeBatch.put(cf, copyOf(keyArray, ROW_PREFIX_SIZE), copyOf(value.array(), value.capacity()));
@@ -401,7 +443,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// Add timestamp to the key, and put the value back into the storage.
putTimestamp(keyBuf, timestamp);
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, TX_ID_SIZE, valueBytes.length));
+ writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
} catch (RocksDBException e) {
throw new StorageException("Failed to commit row into storage", e);
}
@@ -409,17 +451,17 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+ public @Nullable ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
return read(rowId, null, txId);
}
/** {@inheritDoc} */
@Override
- public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+ public @Nullable ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
return read(rowId, timestamp, null);
}
- private @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
+ private @Nullable ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
throws TxIdMismatchException, StorageException {
assert timestamp == null ^ txId == null;
@@ -430,8 +472,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// We can read data outside of consistency closure. Batch is not required.
WriteBatchWithIndex writeBatch = WRITE_BATCH.get();
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-
try (
// Set next partition as an upper bound.
var readOpts = new ReadOptions().setIterateUpperBound(upperBound);
@@ -443,56 +483,166 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
: baseIterator
) {
if (timestamp == null) {
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
// Seek to the first appearance of row id if timestamp isn't set.
- // Since timestamps are sorted from newest to oldest, first occurance will always be the latest version.
+ // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
// Unfortunately, copy here is unavoidable with current API.
- seekIterator.seek(copyOf(keyBuf.array(), keyBuf.position()));
- } else {
- // Put timestamp restriction according to N2O timestamps order.
- putTimestamp(keyBuf, timestamp);
+ assert keyBuf.position() == ROW_PREFIX_SIZE;
+ seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+
+ if (invalid(seekIterator)) {
+ // No data at all.
+ return null;
+ }
+
+ ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+
+ int keyLength = seekIterator.key(readKeyBuf);
+
+ if (!matches(rowId, readKeyBuf)) {
+ // Wrong row id.
+ return null;
+ }
+
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+ byte[] valueBytes = seekIterator.value();
+
+ if (!isWriteIntent) {
+ // There is no write-intent, return latest committed row.
+ return wrapCommittedValue(valueBytes);
+ }
- // This seek will either find a key with timestamp that's less or equal than required value, or a different key whatsoever.
- // It is guaranteed by descending order of timestamps.
- seekIterator.seek(keyBuf.array());
+ assert valueBytes != null;
+
+ validateTxId(valueBytes, txId);
+
+ return wrapUncommittedValue(valueBytes, null);
+ } else {
+ return readByTimestamp(seekIterator, rowId, timestamp);
}
+ }
+ }
+
+ private @Nullable ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
+ // Put timestamp restriction according to N2O timestamps order.
+ putTimestamp(keyBuf, timestamp);
+
+ // This seek will either find a key with timestamp that's less or equal than required value, or a different key whatsoever.
+ // It is guaranteed by descending order of timestamps.
+ seekIterator.seek(keyBuf.array());
+
+ // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
+ // To avoid returning its value, we have to check that actual key matches what we need.
+ // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations.
+ ByteBuffer foundKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+
+ int keyLength = 0;
+
+ if (!invalid(seekIterator)) {
+ keyLength = seekIterator.key(foundKeyBuf);
+ }
+
+ if (invalid(seekIterator) || !matches(rowId, foundKeyBuf)) {
+ // There is no record older than timestamp.
+ // There might be a write-intent which we should return.
+ // Seek to *just* row id.
+ seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
- // Return null if nothing was found.
if (invalid(seekIterator)) {
+ // There are no writes with row id.
+ return null;
+ }
+
+ foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+ keyLength = seekIterator.key(foundKeyBuf);
+
+ if (!matches(rowId, foundKeyBuf)) {
+ // There are no writes with row id.
return null;
}
- // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key, obviously.
- // To avoid returning its value, we have to check that actual key matches what we need.
- // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations.
- ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+ byte[] valueBytes = seekIterator.value();
+
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
- int keyLength = seekIterator.key(directBuffer);
+ if (isWriteIntent) {
+ // Let's check if there is a committed write.
+ seekIterator.next();
- boolean valueHasTxId = keyLength == ROW_PREFIX_SIZE;
+ if (invalid(seekIterator)) {
+ // There are no committed writes, we can safely return write-intent.
+ return wrapUncommittedValue(valueBytes, null);
+ }
- // Comparison starts from the position of the row id.
- directBuffer.position(ROW_ID_OFFSET);
+ foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+ seekIterator.key(foundKeyBuf);
- // Return null if seek found a wrong key.
- if (!matches(rowId, directBuffer)) {
- return null;
+ if (!matches(rowId, foundKeyBuf)) {
+ // There are no committed writes, we can safely return write-intent.
+ return wrapUncommittedValue(valueBytes, null);
+ }
}
- // Get binary row from the iterator. It has the exact payload that we need.
+ // There is a committed write, but it's more recent than our timestamp (because we didn't find it with first seek).
+ return null;
+ } else {
+ // Should not be write-intent, as we seek'd with the timestamp.
+ assert keyLength == MAX_KEY_SIZE;
+
+ HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf, ROW_PREFIX_SIZE);
+
byte[] valueBytes = seekIterator.value();
- assert valueBytes != null;
+ if (rowTimestamp.equals(timestamp)) {
+ // This is exactly the row we are looking for.
+ return wrapCommittedValue(valueBytes);
+ }
- if (txId != null && valueHasTxId) {
- validateTxId(valueBytes, txId);
+ // Let's check if there is more recent write. If it is a write-intent, then return write-intent.
+ // If it is a committed write, then we are already in a right range.
+ seekIterator.prev();
+
+ if (invalid(seekIterator)) {
+ // There is no more recent commits or write-intents.
+ return wrapCommittedValue(valueBytes);
+ }
+
+ foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+ keyLength = seekIterator.key(foundKeyBuf);
+
+ if (!matches(rowId, foundKeyBuf)) {
+ // There is no more recent commits or write-intents under this row id.
+ return wrapCommittedValue(valueBytes);
+ }
+
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+ if (isWriteIntent) {
+ valueBytes = seekIterator.value();
+
+ return wrapUncommittedValue(valueBytes, rowTimestamp);
}
- return wrapValueIntoBinaryRow(valueBytes, valueHasTxId);
+ return wrapCommittedValue(valueBytes);
}
}
- private static boolean matches(RowId rowId, ByteBuffer buf) {
- return rowId.mostSignificantBits() == buf.getLong() && rowId.leastSignificantBits() == buf.getLong();
+ /**
+ * Checks if row id matches the one written in the key buffer. Note: this operation changes the position in the buffer.
+ *
+ * @param rowId Row id.
+ * @param keyByf Key buffer.
+ * @return {@code true} if row id matches the key buffer, {@code false} otherwise.
+ */
+ private static boolean matches(RowId rowId, ByteBuffer keyByf) {
+ // Comparison starts from the position of the row id.
+ keyByf.position(ROW_ID_OFFSET);
+
+ return rowId.mostSignificantBits() == keyByf.getLong() && rowId.leastSignificantBits() == keyByf.getLong();
}
//TODO IGNITE-16914 Play with prefix settings and benchmark results.
@@ -529,7 +679,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// Byte buffer from a thread-local field can't be used here, because of two reasons:
// - no one guarantees that there will only be a single cursor;
// - no one guarantees that returned cursor will not be used by other threads.
- // The thing is, we need this buffer to preserve its content between invocactions of "hasNext" method.
+ // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
ByteBuffer seekKeyBuf = ByteBuffer.allocate(seekKeyBufSize).order(BIG_ENDIAN).putShort((short) partitionId);
if (timestamp != null) {
@@ -548,7 +698,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
public boolean hasNext() {
- // Fastpath for consecutive invocations.
+ // Fast-path for consecutive invocations.
if (next != null) {
return true;
}
@@ -591,7 +741,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// Read the actual key into a direct buffer.
int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
- boolean valueHasTxId = keyLength == ROW_PREFIX_SIZE;
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
directBuffer.limit(ROW_PREFIX_SIZE);
@@ -621,7 +771,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// itself will be different, and we must check it.
keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
- valueHasTxId = keyLength == ROW_PREFIX_SIZE;
+ isWriteIntent = keyLength == ROW_PREFIX_SIZE;
directBuffer.limit(ROW_PREFIX_SIZE);
@@ -649,10 +799,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (found) {
byte[] valueBytes = it.value();
- BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, valueHasTxId);
+ BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, isWriteIntent);
if (binaryRow != null && keyFilter.test(binaryRow)) {
- if (txId != null && valueHasTxId) {
+ if (txId != null && isWriteIntent) {
validateTxId(valueBytes, txId);
}
@@ -822,11 +972,24 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
buf.putInt(~ts.getLogical());
}
- private static void putTransactionId(byte[] array, int off, UUID txId) {
+ private static HybridTimestamp readTimestamp(ByteBuffer buf, int off) {
+ assert buf.order() == BIG_ENDIAN;
+
+ long physical = ~buf.getLong(off);
+ int logical = ~buf.getInt(off + Long.BYTES);
+
+ return new HybridTimestamp(physical, logical);
+ }
+
+ private static void putUuid(byte[] array, int off, UUID txId) {
GridUnsafe.putLong(array, GridUnsafe.BYTE_ARR_OFF + off, txId.getMostSignificantBits());
GridUnsafe.putLong(array, GridUnsafe.BYTE_ARR_OFF + off + Long.BYTES, txId.getLeastSignificantBits());
}
+ private static void putShort(byte[] array, int off, short value) {
+ GridUnsafe.putShort(array, GridUnsafe.BYTE_ARR_OFF + off, value);
+ }
+
private static void validateTxId(byte[] valueBytes, UUID txId) {
long msb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF);
long lsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + Long.BYTES);
@@ -859,19 +1022,60 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* Converts raw byte array representation of the value into a binary row.
*
* @param valueBytes Value bytes as read from the storage.
- * @param valueHasTxId Whether the value has a transaction id prefix in it.
+ * @param valueHasTxData Whether the value has a transaction id prefix in it.
* @return Binary row instance or {@code null} if value is a tombstone.
*/
- private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[] valueBytes, boolean valueHasTxId) {
- if (isTombstone(valueBytes, valueHasTxId)) {
+ private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[] valueBytes, boolean valueHasTxData) {
+ if (isTombstone(valueBytes, valueHasTxData)) {
return null;
}
- return valueHasTxId
- ? new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(TX_ID_SIZE).slice().order(LITTLE_ENDIAN))
+ return valueHasTxData
+ ? new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(LITTLE_ENDIAN))
: new ByteBufferRow(valueBytes);
}
+ /**
+ * Converts raw byte array representation of the write-intent value into a read result adding newest commit timestamp if
+ * it is not {@code null}.
+ *
+ * @param valueBytes Value bytes as read from the storage.
+ * @param newestCommitTs Commit timestamp of the most recent committed write of this value.
+ * @return Read result instance or {@code null} if value is a tombstone.
+ */
+ private static @Nullable ReadResult wrapUncommittedValue(byte[] valueBytes, @Nullable HybridTimestamp newestCommitTs) {
+ if (isTombstone(valueBytes, true)) {
+ return null;
+ }
+
+ long txIdMsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TX_ID_OFFSET);
+ long txIdLsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TX_ID_OFFSET + Long.BYTES);
+
+ long commitTableIdMsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TABLE_ID_OFFSET);
+ long commitTableIdLsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TABLE_ID_OFFSET + Long.BYTES);
+
+ int commitPartitionId = GridUnsafe.getShort(valueBytes, GridUnsafe.BYTE_ARR_OFF + PARTITION_ID_OFFSET) & 0xFFFF;
+
+ BinaryRow row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(LITTLE_ENDIAN));
+
+ return ReadResult.createFromWriteIntent(row, new UUID(txIdMsb, txIdLsb), new UUID(commitTableIdMsb, commitTableIdLsb),
+ newestCommitTs, commitPartitionId);
+ }
+
+ /**
+ * Converts raw byte array representation of the value into a read result.
+ *
+ * @param valueBytes Value bytes as read from the storage.
+ * @return Read result instance or {@code null} if value is a tombstone.
+ */
+ private static @Nullable ReadResult wrapCommittedValue(byte[] valueBytes) {
+ if (isTombstone(valueBytes, false)) {
+ return null;
+ }
+
+ return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes));
+ }
+
/**
* Creates a prefix of all keys in the given partition.
*/
@@ -894,6 +1098,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* Returns {@code true} if value payload represents a tombstone.
*/
private static boolean isTombstone(byte[] valueBytes, boolean hasTxId) {
- return valueBytes.length == (hasTxId ? TX_ID_SIZE : 0);
+ return valueBytes.length == (hasTxId ? VALUE_HEADER_SIZE : 0);
}
}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index e51591470b..19e935122d 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -122,7 +122,7 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
assertThat(tableStorage.getMvPartition(42), is(nullValue()));
assertThat(tableStorage.getOrCreateMvPartition(42).read(rowId0, txId), is(nullValue()));
- assertThat(unwrap(tableStorage.getMvPartition(1 << 8).read(rowId1, txId)), is(equalTo(unwrap(testData))));
+ assertThat(unwrap(tableStorage.getMvPartition(1 << 8).read(rowId1, txId).binaryRow()), is(equalTo(unwrap(testData))));
}
/**
@@ -146,7 +146,7 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
assertThat(tableStorage.getMvPartition(0), is(notNullValue()));
assertThat(tableStorage.getMvPartition(1), is(nullValue()));
- assertThat(unwrap(tableStorage.getMvPartition(0).read(rowId0, txId)), is(equalTo(unwrap(testData))));
+ assertThat(unwrap(tableStorage.getMvPartition(0).read(rowId0, txId).binaryRow()), is(equalTo(unwrap(testData))));
}
@Test
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
index e65ebeb739..6427e6ca51 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.TxManager;
@@ -111,9 +112,9 @@ public class VersionedRowStore {
return null;
}
- BinaryRow result = storage.read(rowId, txId);
+ ReadResult result = storage.read(rowId, txId);
- return result;
+ return result != null ? result.binaryRow() : null;
}
/**
@@ -155,7 +156,7 @@ public class VersionedRowStore {
txsInsertedKeys.computeIfAbsent(txId, entry -> new CopyOnWriteArrayList<ByteBuffer>()).add(key);
} else {
- storage.addWrite(rowId, row, txId);
+ storage.addWrite(rowId, row, txId, UUID.randomUUID(), 0);
}
}
@@ -193,13 +194,13 @@ public class VersionedRowStore {
return false;
}
- BinaryRow prevRow = storage.read(primaryIndex.get(row.keySlice()), txId);
+ ReadResult prevRow = storage.read(primaryIndex.get(row.keySlice()), txId);
if (prevRow == null) {
return false;
}
- storage.addWrite(primaryIndex.get(row.keySlice()), null, txId);
+ storage.addWrite(primaryIndex.get(row.keySlice()), null, txId, UUID.randomUUID(), 0);
txsRemovedKeys.computeIfAbsent(txId, entry -> new CopyOnWriteArrayList<>()).add(row.keySlice());