You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/09/20 15:24:29 UTC
[ignite-3] branch main updated: IGNITE-17318 Implement RocksDB based sorted index storage (#1076)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dec967219a IGNITE-17318 Implement RocksDB based sorted index storage (#1076)
dec967219a is described below
commit dec967219a1a5a1c6a8de25586effd841a8eec73
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Sep 20 18:24:23 2022 +0300
IGNITE-17318 Implement RocksDB based sorted index storage (#1076)
---
.../internal/binarytuple/BinaryTupleBuilder.java | 76 ++--
.../internal/binarytuple/BinaryTupleCommon.java | 21 +
.../internal/binarytuple/BinaryTupleParser.java | 8 +-
.../binarytuple/BinaryTuplePrefixBuilder.java | 78 ++++
.../internal/binarytuple/BinaryTupleTest.java | 68 ++--
.../handler/requests/table/ClientTableCommon.java | 2 +-
.../internal/client/table/ClientKeyValueView.java | 2 +-
.../client/table/ClientRecordSerializer.java | 2 +-
.../client/table/ClientTupleSerializer.java | 4 +-
.../raft/RocksDbClusterStateStorage.java | 2 +-
.../raft/AbstractClusterStateStorageTest.java | 2 +-
.../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs | 2 +-
.../apache/ignite/internal/rocksdb/RocksUtils.java | 18 +-
.../rocksdb/flush/RocksDbFlushListener.java | 9 +-
.../internal/rocksdb/flush/RocksDbFlusher.java | 25 +-
.../ignite/internal/schema/BinaryConverter.java | 3 +-
.../apache/ignite/internal/schema/BinaryRow.java | 5 +
.../ignite/internal/schema/BinaryTuplePrefix.java | 71 ++++
.../ignite/internal/schema/ByteBufferRow.java | 5 +
.../ignite/internal/schema/row/InternalTuple.java | 6 +
.../org/apache/ignite/internal/schema/row/Row.java | 5 +
.../internal/schema/BinaryTuplePrefixTest.java | 99 +++++
.../storage/index}/BinaryTupleComparator.java | 78 ++--
.../storage/index/HashIndexDescriptor.java | 1 -
.../storage/index/SortedIndexDescriptor.java | 70 +++-
.../internal/storage/index/SortedIndexStorage.java | 6 +-
.../index/AbstractSortedIndexStorageTest.java | 41 +-
.../storage/index/BinaryTupleComparatorTest.java | 443 +++++++++++++++++++++
.../storage/index/TestSortedIndexStorageTest.java | 4 +-
.../internal/storage/index/impl/TestIndexRow.java | 3 +-
.../storage/AbstractMvTableStorageTest.java | 12 +-
.../index/AbstractHashIndexStorageTest.java | 7 +-
.../index/impl/BinaryTupleRowSerializer.java | 54 ++-
.../internal/storage/index/impl/IndexRowImpl.java | 46 ---
.../storage/index/impl/TestSortedIndexStorage.java | 59 ++-
.../storage/rocksdb/ColumnFamilyUtils.java | 34 +-
.../rocksdb/{HashIndexes.java => HashIndex.java} | 18 +-
.../storage/rocksdb/RocksDbMetaStorage.java | 2 +-
.../storage/rocksdb/RocksDbTableStorage.java | 92 ++++-
.../internal/storage/rocksdb/SortedIndex.java | 69 ++++
.../storage/rocksdb/index/CursorUtils.java | 259 ++++++++++++
.../index/RocksDbBinaryTupleComparator.java | 102 +++++
.../rocksdb/index/RocksDbHashIndexStorage.java | 6 +-
.../rocksdb/index/RocksDbSortedIndexStorage.java | 240 +++++++++++
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 19 -
.../storage/rocksdb/index/CursorUtilsTest.java | 90 +++++
.../rocksdb/index/RocksDbHashIndexStorageTest.java | 25 +-
...est.java => RocksDbSortedIndexStorageTest.java} | 35 +-
48 files changed, 1981 insertions(+), 347 deletions(-)
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
index 7f09077556..12c7527f68 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
@@ -33,7 +33,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.BitSet;
import java.util.UUID;
-import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Utility to construct a binary tuple.
@@ -67,13 +67,23 @@ public class BinaryTupleBuilder {
private boolean hasNullValues = false;
/**
- * Constructor.
+ * Creates a builder.
+ *
+ * @param numElements Number of tuple elements.
+ * @param allowNulls True if NULL values are possible, false otherwise.
+ */
+ public BinaryTupleBuilder(int numElements, boolean allowNulls) {
+ this(numElements, allowNulls, -1);
+ }
+
+ /**
+ * Creates a builder.
*
* @param numElements Number of tuple elements.
* @param allowNulls True if NULL values are possible, false otherwise.
* @param totalValueSize Total estimated length of non-NULL values, -1 if not known.
*/
- private BinaryTupleBuilder(int numElements, boolean allowNulls, int totalValueSize) {
+ public BinaryTupleBuilder(int numElements, boolean allowNulls, int totalValueSize) {
this.numElements = numElements;
int base = BinaryTupleCommon.HEADER_SIZE;
@@ -94,29 +104,6 @@ public class BinaryTupleBuilder {
allocate(totalValueSize);
}
- /**
- * Creates a builder.
- *
- * @param numElements Number of tuple elements.
- * @param allowNulls True if NULL values are possible, false otherwise.
- * @return Tuple builder.
- */
- public static BinaryTupleBuilder create(int numElements, boolean allowNulls) {
- return create(numElements, allowNulls, -1);
- }
-
- /**
- * Creates a builder.
- *
- * @param numElements Number of tuple elements.
- * @param allowNulls True if NULL values are possible, false otherwise.
- * @param totalValueSize Total estimated length of non-NULL values, -1 if not known.
- * @return Tuple builder.
- */
- public static BinaryTupleBuilder create(int numElements, boolean allowNulls, int totalValueSize) {
- return new BinaryTupleBuilder(numElements, allowNulls, totalValueSize);
- }
-
/**
* Check if the binary tuple contains a null map.
*/
@@ -223,7 +210,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendInt(Integer value) {
+ public BinaryTupleBuilder appendInt(@Nullable Integer value) {
return value == null ? appendNull() : appendInt(value.intValue());
}
@@ -308,7 +295,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendNumberNotNull(@NotNull BigInteger value) {
+ public BinaryTupleBuilder appendNumberNotNull(BigInteger value) {
putBytes(value.toByteArray());
return proceed();
}
@@ -330,7 +317,7 @@ public class BinaryTupleBuilder {
* @param scale Decimal scale.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendDecimalNotNull(@NotNull BigDecimal value, int scale) {
+ public BinaryTupleBuilder appendDecimalNotNull(BigDecimal value, int scale) {
putBytes(value.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray());
return proceed();
}
@@ -352,7 +339,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendStringNotNull(@NotNull String value) {
+ public BinaryTupleBuilder appendStringNotNull(String value) {
try {
putString(value);
} catch (CharacterCodingException e) {
@@ -367,7 +354,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendString(String value) {
+ public BinaryTupleBuilder appendString(@Nullable String value) {
return value == null ? appendNull() : appendStringNotNull(value);
}
@@ -377,7 +364,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendBytesNotNull(@NotNull byte[] value) {
+ public BinaryTupleBuilder appendBytesNotNull(byte[] value) {
putBytes(value);
return proceed();
}
@@ -398,7 +385,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendUuidNotNull(@NotNull UUID value) {
+ public BinaryTupleBuilder appendUuidNotNull(UUID value) {
long lsb = value.getLeastSignificantBits();
long msb = value.getMostSignificantBits();
if ((lsb | msb) != 0L) {
@@ -424,7 +411,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendBitmaskNotNull(@NotNull BitSet value) {
+ public BinaryTupleBuilder appendBitmaskNotNull(BitSet value) {
putBytes(value.toByteArray());
return proceed();
}
@@ -445,7 +432,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendDateNotNull(@NotNull LocalDate value) {
+ public BinaryTupleBuilder appendDateNotNull(LocalDate value) {
if (value != BinaryTupleCommon.DEFAULT_DATE) {
putDate(value);
}
@@ -468,7 +455,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendTimeNotNull(@NotNull LocalTime value) {
+ public BinaryTupleBuilder appendTimeNotNull(LocalTime value) {
if (value != BinaryTupleCommon.DEFAULT_TIME) {
putTime(value);
}
@@ -491,7 +478,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendDateTimeNotNull(@NotNull LocalDateTime value) {
+ public BinaryTupleBuilder appendDateTimeNotNull(LocalDateTime value) {
if (value != BinaryTupleCommon.DEFAULT_DATE_TIME) {
putDate(value.toLocalDate());
putTime(value.toLocalTime());
@@ -515,7 +502,7 @@ public class BinaryTupleBuilder {
* @param value Element value.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendTimestampNotNull(@NotNull Instant value) {
+ public BinaryTupleBuilder appendTimestampNotNull(Instant value) {
if (value != BinaryTupleCommon.DEFAULT_TIMESTAMP) {
long seconds = value.getEpochSecond();
int nanos = value.getNano();
@@ -543,7 +530,7 @@ public class BinaryTupleBuilder {
* @param bytes Buffer with element raw bytes.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendElementBytes(@NotNull ByteBuffer bytes) {
+ public BinaryTupleBuilder appendElementBytes(ByteBuffer bytes) {
putElement(bytes);
return proceed();
}
@@ -556,7 +543,7 @@ public class BinaryTupleBuilder {
* @param length Length of the element in the buffer.
* @return {@code this} for chaining.
*/
- public BinaryTupleBuilder appendElementBytes(@NotNull ByteBuffer bytes, int offset, int length) {
+ public BinaryTupleBuilder appendElementBytes(ByteBuffer bytes, int offset, int length) {
putElement(bytes, offset, length);
return proceed();
}
@@ -570,6 +557,15 @@ public class BinaryTupleBuilder {
return elementIndex;
}
+ /**
+ * Gets the expected number of tuple elements.
+ *
+ * @return Expected number of tuple elements.
+ */
+ public int numElements() {
+ return numElements;
+ }
+
/**
* Finalize tuple building.
*
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
index 5d5e3eb409..43e719db6d 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.binarytuple;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -37,6 +38,13 @@ public class BinaryTupleCommon {
/** Flag that indicates null map presence. */
public static final int NULLMAP_FLAG = 0b100;
+ /**
+ * Flag that indicates that a Binary Tuple is instead a Binary Tuple Prefix.
+ *
+ * @see BinaryTuplePrefixBuilder
+ */
+ public static final int PREFIX_FLAG = 0b1000;
+
/** Default value for UUID elements. */
public static final UUID DEFAULT_UUID = new UUID(0, 0);
@@ -110,4 +118,17 @@ public class BinaryTupleCommon {
public static byte nullMask(int index) {
return (byte) (1 << (index % 8));
}
+
+ /**
+ * Returns {@code true} if the given {@code buffer} represents a Binary Tuple Prefix.
+ *
+ * @param buffer Buffer containing a serialized Binary Tuple or Binary Tuple Prefix.
+ * @return {@code true} if the given {@code buffer} represents a Binary Tuple Prefix or {@code false} otherwise.
+ * @see BinaryTuplePrefixBuilder
+ */
+ public static boolean isPrefix(ByteBuffer buffer) {
+ byte flags = buffer.get(0);
+
+ return (flags & PREFIX_FLAG) != 0;
+ }
}
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
index f96d4451ec..47e8e4ca87 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
@@ -49,6 +49,9 @@ public class BinaryTupleParser {
/** UUID size in bytes. */
private static final int UUID_SIZE = 16;
+ /** Byte order of ByteBuffers that contain the tuple. */
+ private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
/** Number of elements in the tuple. */
private final int numElements;
@@ -73,13 +76,14 @@ public class BinaryTupleParser {
public BinaryTupleParser(int numElements, ByteBuffer buffer) {
this.numElements = numElements;
- assert buffer.order() == ByteOrder.LITTLE_ENDIAN;
+ assert buffer.order() == ORDER;
assert buffer.position() == 0;
this.buffer = buffer;
byte flags = buffer.get(0);
int base = BinaryTupleCommon.HEADER_SIZE;
+
if ((flags & BinaryTupleCommon.NULLMAP_FLAG) != 0) {
base += BinaryTupleCommon.nullMapSize(numElements);
}
@@ -114,7 +118,7 @@ public class BinaryTupleParser {
* Returns the content of this tuple as a byte buffer.
*/
public ByteBuffer byteBuffer() {
- return buffer.slice();
+ return buffer.slice().order(ORDER);
}
/**
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTuplePrefixBuilder.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTuplePrefixBuilder.java
new file mode 100644
index 0000000000..173765b39f
--- /dev/null
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTuplePrefixBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binarytuple;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Class for build Binary Tuple Prefixes.
+ *
+ * <p>A Binary Tuple Prefix is an extension of the Binary Tuple format, that is, it adds an additional field, containing the number of
+ * elements in the prefix, at the end of the serialized tuple representation. This is helpful in cases when it is required to de-serialize
+ * such prefix while only having the full tuple schema.
+ *
+ * <p>The builder also sets the {@link BinaryTupleCommon#PREFIX_FLAG} in the flags region in order to able to distinguish such prefixes
+ * from regular tuples.
+ */
+public class BinaryTuplePrefixBuilder extends BinaryTupleBuilder {
+ private final int prefixNumElements;
+
+ /**
+ * Creates a new builder.
+ *
+ * @param prefixNumElements Number of elements in the prefix.
+ * @param fullNumElements Number of elements in the Binary Tuple Schema.
+ */
+ public BinaryTuplePrefixBuilder(int prefixNumElements, int fullNumElements) {
+ super(fullNumElements + 1, true, -1);
+
+ this.prefixNumElements = prefixNumElements;
+ }
+
+ @Override
+ public ByteBuffer build() {
+ int elementIndex = elementIndex();
+
+ if (elementIndex != prefixNumElements) {
+ throw new IllegalStateException(String.format(
+ "Unexpected amount of elements in a BinaryTuple prefix. Expected: %d, actual %d",
+ prefixNumElements, elementIndex
+ ));
+ }
+
+ int numElements = numElements();
+
+ // Use nulls instead of the missing elements.
+ while (elementIndex() < numElements - 1) {
+ appendNull();
+ }
+
+ appendInt(prefixNumElements);
+
+ ByteBuffer tuple = super.build();
+
+ // Set the flag indicating that this tuple is a prefix.
+ byte flags = tuple.get(0);
+
+ flags |= BinaryTupleCommon.PREFIX_FLAG;
+
+ tuple.put(0, flags);
+
+ return tuple;
+ }
+}
diff --git a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java
index 969ba0acf6..43e66977ed 100644
--- a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java
+++ b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java
@@ -136,7 +136,7 @@ public class BinaryTupleTest {
public void byteTest() {
byte[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
for (byte value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
ByteBuffer bytes = builder.appendByte(value).build();
assertEquals(value != 0 ? 1 : 0, bytes.get(1));
assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -153,7 +153,7 @@ public class BinaryTupleTest {
public void shortTest() {
short[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
for (short value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
ByteBuffer bytes = builder.appendShort(value).build();
assertEquals(value != 0 ? 1 : 0, bytes.get(1));
assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -164,7 +164,7 @@ public class BinaryTupleTest {
values = new short[]{Short.MIN_VALUE, Byte.MIN_VALUE - 1, Byte.MAX_VALUE + 1, Short.MAX_VALUE};
for (short value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 2);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 2);
ByteBuffer bytes = builder.appendShort(value).build();
assertEquals(2, bytes.get(1));
assertEquals(4, bytes.limit());
@@ -181,7 +181,7 @@ public class BinaryTupleTest {
public void intTest() {
int[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
for (int value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
ByteBuffer bytes = builder.appendInt(value).build();
assertEquals(value != 0 ? 1 : 0, bytes.get(1));
assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -192,7 +192,7 @@ public class BinaryTupleTest {
values = new int[]{Short.MIN_VALUE, Byte.MIN_VALUE - 1, Byte.MAX_VALUE + 1, Short.MAX_VALUE};
for (int value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 2);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 2);
ByteBuffer bytes = builder.appendInt(value).build();
assertEquals(2, bytes.get(1));
assertEquals(4, bytes.limit());
@@ -203,7 +203,7 @@ public class BinaryTupleTest {
values = new int[]{Integer.MIN_VALUE, Short.MIN_VALUE - 1, Short.MAX_VALUE + 1, Integer.MAX_VALUE};
for (int value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
ByteBuffer bytes = builder.appendInt(value).build();
assertEquals(4, bytes.get(1));
assertEquals(6, bytes.limit());
@@ -220,7 +220,7 @@ public class BinaryTupleTest {
public void longTest() {
long[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
for (long value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
ByteBuffer bytes = builder.appendLong(value).build();
assertEquals(value != 0 ? 1 : 0, bytes.get(1));
assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -231,7 +231,7 @@ public class BinaryTupleTest {
values = new long[]{Short.MIN_VALUE, Byte.MIN_VALUE - 1, Byte.MAX_VALUE + 1, Short.MAX_VALUE};
for (long value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 2);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 2);
ByteBuffer bytes = builder.appendLong(value).build();
assertEquals(2, bytes.get(1));
assertEquals(4, bytes.limit());
@@ -242,7 +242,7 @@ public class BinaryTupleTest {
values = new long[]{Integer.MIN_VALUE, Short.MIN_VALUE - 1, Short.MAX_VALUE + 1, Integer.MAX_VALUE};
for (long value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
ByteBuffer bytes = builder.appendLong(value).build();
assertEquals(4, bytes.get(1));
assertEquals(6, bytes.limit());
@@ -253,7 +253,7 @@ public class BinaryTupleTest {
values = new long[]{Long.MIN_VALUE, Integer.MIN_VALUE - 1L, Integer.MAX_VALUE + 1L, Long.MAX_VALUE};
for (long value : values) {
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 8);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 8);
ByteBuffer bytes = builder.appendLong(value).build();
assertEquals(8, bytes.get(1));
assertEquals(10, bytes.limit());
@@ -271,7 +271,7 @@ public class BinaryTupleTest {
{
float value = 0.0F;
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 0);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 0);
ByteBuffer bytes = builder.appendFloat(value).build();
assertEquals(0, bytes.get(1));
assertEquals(2, bytes.limit());
@@ -282,7 +282,7 @@ public class BinaryTupleTest {
{
float value = 0.5F;
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
ByteBuffer bytes = builder.appendFloat(value).build();
assertEquals(4, bytes.get(1));
assertEquals(6, bytes.limit());
@@ -300,7 +300,7 @@ public class BinaryTupleTest {
{
double value = 0.0;
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 0);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 0);
ByteBuffer bytes = builder.appendDouble(value).build();
assertEquals(0, bytes.get(1));
assertEquals(2, bytes.limit());
@@ -311,7 +311,7 @@ public class BinaryTupleTest {
{
double value = 0.5;
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
ByteBuffer bytes = builder.appendDouble(value).build();
assertEquals(4, bytes.get(1));
assertEquals(6, bytes.limit());
@@ -322,7 +322,7 @@ public class BinaryTupleTest {
{
double value = 0.1;
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 8);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 8);
ByteBuffer bytes = builder.appendDouble(value).build();
assertEquals(8, bytes.get(1));
assertEquals(10, bytes.limit());
@@ -339,7 +339,7 @@ public class BinaryTupleTest {
public void numberTest() {
BigInteger value = BigInteger.valueOf(12345);
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendNumber(value).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -353,7 +353,7 @@ public class BinaryTupleTest {
public void decimalTest() {
BigDecimal value = new BigDecimal(BigInteger.valueOf(12345), 100);
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendDecimal(value, 100).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -369,7 +369,7 @@ public class BinaryTupleTest {
int valueScale = 3;
BigDecimal value = BigDecimal.valueOf(123456, valueScale);
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendDecimal(value, schemaScale).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -385,7 +385,7 @@ public class BinaryTupleTest {
public void stringTest() {
String[] values = {"ascii", "我愛Java", "", "a string with a bit more characters"};
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(values.length, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length, false);
for (String value : values) {
builder.appendString(value);
}
@@ -411,7 +411,7 @@ public class BinaryTupleTest {
values[i] = generateBytes(rnd);
}
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(values.length, true);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length, true);
for (byte[] value : values) {
builder.appendBytes(value);
}
@@ -439,7 +439,7 @@ public class BinaryTupleTest {
public void uuidTest() {
UUID value = UUID.randomUUID();
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendUuid(value).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -458,7 +458,7 @@ public class BinaryTupleTest {
if (valueBytes != null) {
BitSet value = BitSet.valueOf(valueBytes);
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendBitmask(value).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -474,7 +474,7 @@ public class BinaryTupleTest {
public void dateTest() {
LocalDate value = LocalDate.now();
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendDate(value).build();
assertEquals(3, bytes.get(1));
assertEquals(5, bytes.limit());
@@ -491,7 +491,7 @@ public class BinaryTupleTest {
LocalTime value = LocalTime.now();
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendTime(value).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -501,7 +501,7 @@ public class BinaryTupleTest {
value = LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_000_000);
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendTime(value).build();
assertEquals(4, bytes.get(1));
assertEquals(6, bytes.limit());
@@ -513,7 +513,7 @@ public class BinaryTupleTest {
value = LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_000);
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendTime(value).build();
assertEquals(5, bytes.get(1));
assertEquals(7, bytes.limit());
@@ -525,7 +525,7 @@ public class BinaryTupleTest {
value = LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_001);
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendTime(value).build();
assertEquals(6, bytes.get(1));
assertEquals(8, bytes.limit());
@@ -543,7 +543,7 @@ public class BinaryTupleTest {
LocalDateTime value = LocalDateTime.now();
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendDateTime(value).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -554,7 +554,7 @@ public class BinaryTupleTest {
LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_000_000));
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendDateTime(value).build();
assertEquals(7, bytes.get(1));
assertEquals(9, bytes.limit());
@@ -567,7 +567,7 @@ public class BinaryTupleTest {
LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_000));
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendDateTime(value).build();
assertEquals(8, bytes.get(1));
assertEquals(10, bytes.limit());
@@ -580,7 +580,7 @@ public class BinaryTupleTest {
LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_001));
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendDateTime(value).build();
assertEquals(9, bytes.get(1));
assertEquals(11, bytes.limit());
@@ -599,7 +599,7 @@ public class BinaryTupleTest {
Instant value = Instant.now();
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendTimestamp(value).build();
BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -609,7 +609,7 @@ public class BinaryTupleTest {
value = Instant.ofEpochSecond(value.getEpochSecond());
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendTimestamp(value).build();
assertEquals(8, bytes.get(1));
assertEquals(10, bytes.limit());
@@ -621,7 +621,7 @@ public class BinaryTupleTest {
value = Instant.ofEpochSecond(value.getEpochSecond(), 1);
{
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
ByteBuffer bytes = builder.appendTimestamp(value).build();
assertEquals(12, bytes.get(1));
assertEquals(14, bytes.limit());
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 1ab66e4521..dd45979d68 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -133,7 +133,7 @@ public class ClientTableCommon {
packer.packInt(schema.version());
}
- var builder = BinaryTupleBuilder.create(columnCount(schema, part), true);
+ var builder = new BinaryTupleBuilder(columnCount(schema, part), true);
if (part != TuplePart.VAL) {
for (var col : schema.keyColumns().columns()) {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 03daff5b6f..224ad509c3 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -458,7 +458,7 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
}
private void writeKeyValueRaw(ClientSchema s, PayloadOutputChannel w, @NotNull K key, V val) {
- var builder = BinaryTupleBuilder.create(s.columns().length, true);
+ var builder = new BinaryTupleBuilder(s.columns().length, true);
var noValueSet = new BitSet();
ClientMarshallerWriter writer = new ClientMarshallerWriter(builder, noValueSet);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java
index faadcdef21..2b133dab70 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java
@@ -105,7 +105,7 @@ public class ClientRecordSerializer<R> {
*/
static <R> void writeRecRaw(@Nullable R rec, ClientMessagePacker out, Marshaller marshaller, int columnCount) {
try {
- var builder = BinaryTupleBuilder.create(columnCount, true);
+ var builder = new BinaryTupleBuilder(columnCount, true);
var noValueSet = new BitSet();
var writer = new ClientMarshallerWriter(builder, noValueSet);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
index 79b30172d9..0bf002c89f 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
@@ -134,7 +134,7 @@ public class ClientTupleSerializer {
var columns = schema.columns();
var count = keyOnly ? schema.keyColumnCount() : columns.length;
- var builder = BinaryTupleBuilder.create(count, true);
+ var builder = new BinaryTupleBuilder(count, true);
var noValueSet = new BitSet(count);
for (var i = 0; i < count; i++) {
@@ -172,7 +172,7 @@ public class ClientTupleSerializer {
var columns = schema.columns();
var noValueSet = new BitSet(columns.length);
- var builder = BinaryTupleBuilder.create(columns.length, true);
+ var builder = new BinaryTupleBuilder(columns.length, true);
for (var i = 0; i < columns.length; i++) {
var col = columns[i];
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
index dd2bd9e1af..f9b281bc2f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
@@ -135,7 +135,7 @@ public class RocksDbClusterStateStorage implements ClusterStateStorage {
@Override
public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T> entryTransformer) {
- byte[] upperBound = RocksUtils.rangeEnd(prefix);
+ byte[] upperBound = RocksUtils.incrementArray(prefix);
Slice upperBoundSlice = upperBound == null ? null : new Slice(upperBound);
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
index d9c84f9064..768618a560 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
@@ -186,7 +186,7 @@ public abstract class AbstractClusterStateStorageTest {
@Test
void testGetWithPrefixBorder() throws Exception {
byte[] key1 = "key1".getBytes(UTF_8);
- byte[] key2 = RocksUtils.rangeEnd(key1);
+ byte[] key2 = RocksUtils.incrementArray(key1);
storage.put(key1, "value1".getBytes(UTF_8));
storage.put(key2, "value2".getBytes(UTF_8));
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 4517a52ae3..ff8ee2b987 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -263,7 +263,7 @@ namespace Apache.Ignite.Tests.Sql
var ex = Assert.ThrowsAsync<TableNotFoundException>(
async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"));
- StringAssert.EndsWith("Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]", ex!.Message);
+ StringAssert.EndsWith("The table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]", ex!.Message);
StringAssert.StartsWith("IGN-TBL-2", ex.Message);
StringAssert.StartsWith("IGN-TBL-2", ex.CodeAsString);
StringAssert.StartsWith("TBL", ex.GroupName);
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
index 5c27d8ff57..14b8877f26 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -83,28 +83,28 @@ public class RocksUtils {
* <p>This method tries to increment the least significant byte (in BE order) that is not equal to 0xFF (bytes are treated as
* unsigned values).
*
- * @param rangeStart Start of a range of keys (prefix) in RocksDB.
+ * @param array Start of a range of keys (prefix) in RocksDB.
* @return End of a range of keys in RocksDB or {@code null} if all bytes of the prefix are equal to 0xFF.
*/
- public static byte @Nullable [] rangeEnd(byte[] rangeStart) {
- byte[] rangeEnd = rangeStart.clone();
+ public static byte @Nullable [] incrementArray(byte[] array) {
+ byte[] result = array.clone();
- int i = rangeStart.length - 1;
+ int i = array.length - 1;
// Cycle through all bytes that are equal to 0xFF
- while (i >= 0 && rangeStart[i] == -1) {
- rangeEnd[i] = 0;
+ while (i >= 0 && array[i] == -1) {
+ result[i] = 0;
i--;
}
if (i == -1) {
- // All bytes are equal to 0xFF, no upper bound should be used
+ // All bytes are equal to 0xFF, increment is not possible
return null;
} else {
- rangeEnd[i] += 1;
+ result[i] += 1;
- return rangeEnd;
+ return result;
}
}
}
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
index f72a045473..d709581827 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
@@ -21,7 +21,6 @@ import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BE
import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.rocksdb.AbstractEventListener;
import org.rocksdb.FlushJobInfo;
@@ -51,7 +50,7 @@ class RocksDbFlushListener extends AbstractEventListener {
* @param flusher Flusher instance to delegate events processing to.
*/
RocksDbFlushListener(RocksDbFlusher flusher) {
- super(EnabledEventCallback.ON_FLUSH_BEGIN, EnabledEventCallback.ON_FLUSH_COMPLETED);
+ super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED);
this.flusher = flusher;
}
@@ -67,13 +66,11 @@ class RocksDbFlushListener extends AbstractEventListener {
/** {@inheritDoc} */
@Override
public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
- ExecutorService threadPool = flusher.threadPool;
-
if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
- lastFlushProcessed = CompletableFuture.runAsync(flusher.onFlushCompleted, threadPool);
+ lastFlushProcessed = flusher.onFlushCompleted();
}
// Do it for every column family, there's no way to tell in advance which one has the latest sequence number.
- lastFlushProcessed.whenCompleteAsync((o, throwable) -> flusher.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
+ lastFlushProcessed.whenCompleteAsync((o, throwable) -> flusher.completeFutures(flushJobInfo.getLargestSeqno()), flusher.threadPool);
}
}
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 9c907d079f..63ea3ea4c3 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -47,7 +48,7 @@ public class RocksDbFlusher {
private volatile RocksDB db;
/** List of all column families. */
- private volatile List<ColumnFamilyHandle> columnFamilyHandles;
+ private final List<ColumnFamilyHandle> columnFamilyHandles = new CopyOnWriteArrayList<>();
/** Scheduled pool to schedule flushes. */
private final ScheduledExecutorService scheduledPool;
@@ -59,7 +60,7 @@ public class RocksDbFlusher {
private final IntSupplier delaySupplier;
/** Flush completion callback. */
- final Runnable onFlushCompleted;
+ private final Runnable onFlushCompleted;
/**
* Flush options to be used to asynchronously flush the Rocks DB memtable. It needs to be cached, because
@@ -68,7 +69,7 @@ public class RocksDbFlusher {
private final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(false);
/** Map with flush futures by sequence number at the time of the {@link #awaitFlush(boolean)} call. */
- final SortedMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentSkipListMap<>();
+ private final SortedMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentSkipListMap<>();
/** Latest known sequence number for persisted data. Not volatile, protected by explicit synchronization. */
private long latestPersistedSequenceNumber;
@@ -132,13 +133,20 @@ public class RocksDbFlusher {
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public void init(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles) {
this.db = db;
- this.columnFamilyHandles = columnFamilyHandles;
+ this.columnFamilyHandles.addAll(columnFamilyHandles);
synchronized (latestPersistedSequenceNumberMux) {
latestPersistedSequenceNumber = db.getLatestSequenceNumber();
}
}
+ /**
+ * Adds the given handle to the list of CF handles.
+ */
+ public void addColumnFamily(ColumnFamilyHandle handle) {
+ columnFamilyHandles.add(handle);
+ }
+
/**
* Returns a future to wait next flush operation from the current point in time. Uses {@link RocksDB#getLatestSequenceNumber()} to
* achieve this, by fixing its value at the time of invokation. Storage is considered flushed when at least one persisted column
@@ -235,4 +243,13 @@ public class RocksDbFlusher {
flushOptions.close();
}
+
+ /**
+ * Executes the {@code onFlushCompleted} callback.
+ *
+ * @return Future that completes when the {@code onFlushCompleted} callback finishes.
+ */
+ CompletableFuture<Void> onFlushCompleted() {
+ return CompletableFuture.runAsync(onFlushCompleted, threadPool);
+ }
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
index 259b82f2a3..7633d7aef7 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
@@ -118,8 +118,7 @@ public class BinaryConverter {
}
// Now compose the tuple.
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(
- tupleSchema.elementCount(), hasNulls, estimatedValueSize);
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(tupleSchema.elementCount(), hasNulls, estimatedValueSize);
for (int elementIndex = 0; elementIndex < tupleSchema.elementCount(); elementIndex++) {
BinaryTupleSchema.Element elt = tupleSchema.element(elementIndex);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index c49e83353d..af762964f9 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -94,4 +94,9 @@ public interface BinaryRow {
* Get byte array of the row.
*/
byte[] bytes();
+
+ /**
+ * Returns the representation of this row as a Byte Buffer.
+ */
+ ByteBuffer byteBuffer();
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
new file mode 100644
index 0000000000..32e23dda2f
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+
+/**
+ * Class that represents a Binary Tuple Prefix.
+ *
+ * @see BinaryTuplePrefixBuilder BinaryTuplePrefixBuilder for information about the Binary Tuple Prefix format.
+ */
+public class BinaryTuplePrefix extends BinaryTupleReader implements InternalTuple {
+ /** Tuple schema. */
+ private final BinaryTupleSchema schema;
+
+ /**
+ * Constructor.
+ *
+ * @param schema Full Tuple schema.
+ * @param bytes Serialized representation of a Binary Tuple Prefix.
+ */
+ public BinaryTuplePrefix(BinaryTupleSchema schema, byte[] bytes) {
+ super(schema.elementCount() + 1, bytes);
+ this.schema = schema;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param schema Full Tuple schema.
+ * @param buffer Serialized representation of a Binary Tuple Prefix.
+ */
+ public BinaryTuplePrefix(BinaryTupleSchema schema, ByteBuffer buffer) {
+ super(schema.elementCount() + 1, buffer);
+ this.schema = schema;
+ }
+
+ @Override
+ public int count() {
+ return elementCount();
+ }
+
+ @Override
+ public BigDecimal decimalValue(int index) {
+ return decimalValue(index, schema.element(index).decimalScale);
+ }
+
+ @Override
+ public int elementCount() {
+ return intValue(super.elementCount() - 1);
+ }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 614be90a74..a994c33551 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -124,4 +124,9 @@ public class ByteBufferRow implements BinaryRow {
return tmp;
}
+
+ @Override
+ public ByteBuffer byteBuffer() {
+ return buf.slice().order(ORDER);
+ }
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
index 67c712d726..63c2196206 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema.row;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -220,4 +221,9 @@ public interface InternalTuple {
* @return Column value.
*/
Instant timestampValue(int col);
+
+ /**
+ * Returns the representation of this tuple as a Byte Buffer.
+ */
+ ByteBuffer byteBuffer();
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 4da400eaf9..b63deff5ff 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -809,6 +809,11 @@ public class Row implements BinaryRowEx, SchemaAware, InternalTuple {
return row.bytes();
}
+ @Override
+ public ByteBuffer byteBuffer() {
+ return row.byteBuffer();
+ }
+
/** {@inheritDoc} */
@Override
public int colocationHash() {
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
new file mode 100644
index 0000000000..72574b7ced
--- /dev/null
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the {@link BinaryTuplePrefix} class.
+ */
+public class BinaryTuplePrefixTest {
+
+ /**
+ * Tests construction of a BinaryTuple prefix.
+ */
+ @Test
+ public void testPrefix() {
+ BinaryTupleSchema schema = BinaryTupleSchema.create(new Element[]{
+ new Element(NativeTypes.INT32, false),
+ new Element(NativeTypes.STRING, false),
+ new Element(NativeTypes.DATE, false),
+ new Element(NativeTypes.UUID, false),
+ new Element(NativeTypes.DOUBLE, false)
+ });
+
+ var builder = new BinaryTuplePrefixBuilder(3, 5);
+
+ LocalDate date = LocalDate.now();
+
+ ByteBuffer tuple = builder.appendInt(42)
+ .appendString("foobar")
+ .appendDate(date)
+ .build();
+
+ assertTrue(BinaryTupleCommon.isPrefix(tuple));
+
+ var prefix = new BinaryTuplePrefix(schema, tuple);
+
+ assertThat(prefix.count(), is(3));
+ assertThat(prefix.elementCount(), is(3));
+
+ assertThat(prefix.intValue(0), is(42));
+ assertThat(prefix.stringValue(1), is("foobar"));
+ assertThat(prefix.dateValue(2), is(date));
+ assertThat(prefix.uuidValue(3), is(nullValue()));
+ assertThat(prefix.doubleValueBoxed(4), is(nullValue()));
+ }
+
+ /**
+ * Tests construction of an invalid prefix.
+ */
+ @Test
+ public void testInvalidPrefix() {
+ Exception e = assertThrows(IllegalStateException.class, () -> {
+ var builder = new BinaryTuplePrefixBuilder(3, 5);
+
+ builder.appendInt(42).build();
+ });
+
+ assertThat(e.getMessage(), is("Unexpected amount of elements in a BinaryTuple prefix. Expected: 3, actual 1"));
+
+ e = assertThrows(IllegalStateException.class, () -> {
+ var builder = new BinaryTuplePrefixBuilder(3, 5);
+
+ builder.appendInt(42)
+ .appendInt(42)
+ .appendInt(42)
+ .appendInt(42)
+ .build();
+ });
+
+ assertThat(e.getMessage(), is("Unexpected amount of elements in a BinaryTuple prefix. Expected: 3, actual 4"));
+ }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleComparator.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/BinaryTupleComparator.java
similarity index 60%
rename from modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleComparator.java
rename to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/BinaryTupleComparator.java
index e36dd9671f..4fdc6eb4ed 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleComparator.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/BinaryTupleComparator.java
@@ -15,80 +15,70 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.index.impl;
+package org.apache.ignite.internal.storage.index;
+import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.NativeTypeSpec;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.schema.row.InternalTuple;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
/**
- * Comparator implementation for comparting {@link BinaryTuple}s on a per-column basis.
+ * Comparator implementation for comparing {@link BinaryTuple}s on a per-column basis.
*/
-class BinaryTupleComparator implements Comparator<BinaryTuple> {
+public class BinaryTupleComparator implements Comparator<ByteBuffer> {
private final SortedIndexDescriptor descriptor;
- private final int prefixLength;
-
- private BinaryTupleComparator(SortedIndexDescriptor descriptor, int prefixLength) {
- if (prefixLength > descriptor.indexColumns().size()) {
- throw new IllegalArgumentException("Invalid prefix length: " + prefixLength);
- }
-
- this.descriptor = descriptor;
- this.prefixLength = prefixLength;
- }
-
/**
* Creates a comparator for a Sorted Index identified by the given descriptor.
*/
- static BinaryTupleComparator newComparator(SortedIndexDescriptor descriptor) {
- return new BinaryTupleComparator(descriptor, descriptor.indexColumns().size());
- }
-
- /**
- * Similar to {@link #newComparator} but creates a comparator that only compares first {@code prefixLength} index columns.
- */
- static BinaryTupleComparator newPrefixComparator(SortedIndexDescriptor descriptor, int prefixLength) {
- return new BinaryTupleComparator(descriptor, prefixLength);
+ public BinaryTupleComparator(SortedIndexDescriptor descriptor) {
+ this.descriptor = descriptor;
}
@Override
- public int compare(BinaryTuple tuple1, BinaryTuple tuple2) {
- return compare(tuple1, tuple2, 1, 0);
- }
+ public int compare(ByteBuffer buffer1, ByteBuffer buffer2) {
+ assert buffer1.order() == ByteOrder.LITTLE_ENDIAN;
+ assert buffer2.order() == ByteOrder.LITTLE_ENDIAN;
- /**
- * Compares a given tuple with the configured prefix.
- *
- * @param tuple1 Tuple to compare.
- * @param tuple2 Tuple to compare.
- * @param direction Sort direction: {@code -1} means sorting in reversed order, {@code 1} means sorting in the natural order.
- * @param equals Value that should be returned if the provided tuple exactly matches the prefix.
- * @return the value {@code 0} if the given row starts with the configured prefix;
- * a value less than {@code 0} if the row's prefix is smaller than the prefix; and
- * a value greater than {@code 0} if the row's prefix is larger than the prefix.
- */
- public int compare(BinaryTuple tuple1, BinaryTuple tuple2, int direction, int equals) {
- for (int i = 0; i < prefixLength; i++) {
+ boolean isBuffer1Prefix = isPrefix(buffer1);
+ boolean isBuffer2Prefix = isPrefix(buffer2);
+
+ assert !(isBuffer1Prefix && isBuffer2Prefix);
+
+ BinaryTupleSchema schema = descriptor.binaryTupleSchema();
+
+ InternalTuple tuple1 = isBuffer1Prefix ? new BinaryTuplePrefix(schema, buffer1) : new BinaryTuple(schema, buffer1);
+ InternalTuple tuple2 = isBuffer2Prefix ? new BinaryTuplePrefix(schema, buffer2) : new BinaryTuple(schema, buffer2);
+
+ int columnsToCompare = Math.min(tuple1.count(), tuple2.count());
+
+ assert columnsToCompare <= descriptor.indexColumns().size();
+
+ for (int i = 0; i < columnsToCompare; i++) {
ColumnDescriptor columnDescriptor = descriptor.indexColumns().get(i);
int compare = compareField(tuple1, tuple2, i);
if (compare != 0) {
- return direction * (columnDescriptor.asc() ? compare : -compare);
+ return columnDescriptor.asc() ? compare : -compare;
}
}
- return equals;
+ return 0;
}
/**
* Compares individual fields of two tuples.
*/
- private int compareField(BinaryTuple tuple1, BinaryTuple tuple2, int index) {
+ private int compareField(InternalTuple tuple1, InternalTuple tuple2, int index) {
boolean tuple1HasNull = tuple1.hasNullValue(index);
boolean tuple2HasNull = tuple2.hasNullValue(index);
@@ -142,7 +132,7 @@ class BinaryTupleComparator implements Comparator<BinaryTuple> {
default:
throw new IllegalArgumentException(String.format(
- "Unsupported column schema for creating a sorted index. Column name: %s, column type: %s",
+ "Unsupported column type in binary tuple comparator. Column name: %s, column type: %s",
columnDescriptor.name(), columnDescriptor.type()
));
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
index b547a30543..d72f24406e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
@@ -93,7 +93,6 @@ public class HashIndexDescriptor {
* @param tableConfig Table configuration.
* @param tablesConfig Tables and indexes configuration.
* @param indexId Index id.
- *
*/
// TODO: IGNITE-17727 Fix redundant param.
public HashIndexDescriptor(UUID indexId, TableView tableConfig, TablesView tablesConfig) {
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
index 59a1c220e9..6e2361118b 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.ignite.configuration.schemas.table.TableIndexView;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesView;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
@@ -53,7 +55,28 @@ public class SortedIndexDescriptor {
private final boolean asc;
- ColumnDescriptor(ColumnView tableColumnView, IndexColumnView indexColumnView) {
+ /**
+ * Creates a Column Descriptor.
+ *
+ * @param name Name of the column.
+ * @param type Type of the column.
+ * @param nullable Flag indicating that the column may contain {@code null}s.
+ * @param asc Sort order of the column.
+ */
+ public ColumnDescriptor(String name, NativeType type, boolean nullable, boolean asc) {
+ this.name = name;
+ this.type = type;
+ this.nullable = nullable;
+ this.asc = asc;
+ }
+
+ /**
+ * Creates a Column Descriptor.
+ *
+ * @param tableColumnView Table column configuration.
+ * @param indexColumnView Index column configuration.
+ */
+ public ColumnDescriptor(ColumnView tableColumnView, IndexColumnView indexColumnView) {
this.name = tableColumnView.name();
this.type = SchemaDescriptorConverter.convert(SchemaConfigurationConverter.convert(tableColumnView.type()));
this.nullable = tableColumnView.nullable();
@@ -98,14 +121,36 @@ public class SortedIndexDescriptor {
private final List<ColumnDescriptor> columns;
+ private final BinaryTupleSchema binaryTupleSchema;
+
/**
* Creates an Index Descriptor from a given Table Configuration.
*
- * @param indexId index ID.
- * @param tableConfig table configuration.
+ * @param indexId Index ID.
+ * @param tableConfig Table configuration.
*/
// TODO: IGNITE-17727 Fix redundant param.
public SortedIndexDescriptor(UUID indexId, TableView tableConfig, TablesView tablesConfig) {
+ this(indexId, extractIndexColumnsConfiguration(indexId, tableConfig, tablesConfig));
+ }
+
+ /**
+ * Creates an Index Descriptor from a given set of column descriptors.
+ *
+ * @param indexId Index ID.
+ * @param columnDescriptors Column descriptors.
+ */
+ public SortedIndexDescriptor(UUID indexId, List<ColumnDescriptor> columnDescriptors) {
+ this.id = indexId;
+ this.columns = List.copyOf(columnDescriptors);
+ this.binaryTupleSchema = createSchema(columns);
+ }
+
+ private static List<ColumnDescriptor> extractIndexColumnsConfiguration(
+ UUID indexId,
+ TableView tableConfig,
+ TablesView tablesConfig
+ ) {
TableIndexView indexConfig = ConfigurationUtil.getByInternalId(tablesConfig.indexes(), indexId);
if (indexConfig == null) {
@@ -119,11 +164,9 @@ public class SortedIndexDescriptor {
));
}
- this.id = indexId;
-
NamedListView<? extends IndexColumnView> indexColumns = ((SortedIndexView) indexConfig).columns();
- columns = indexColumns.namedListKeys().stream()
+ return indexColumns.namedListKeys().stream()
.map(columnName -> {
ColumnView columnView = tableConfig.columns().get(columnName);
@@ -136,6 +179,14 @@ public class SortedIndexDescriptor {
.collect(toUnmodifiableList());
}
+ private static BinaryTupleSchema createSchema(List<ColumnDescriptor> columns) {
+ Element[] elements = columns.stream()
+ .map(columnDescriptor -> new Element(columnDescriptor.type(), columnDescriptor.nullable()))
+ .toArray(Element[]::new);
+
+ return BinaryTupleSchema.create(elements);
+ }
+
/**
* Returns this index' ID.
*/
@@ -149,4 +200,11 @@ public class SortedIndexDescriptor {
public List<ColumnDescriptor> indexColumns() {
return columns;
}
+
+ /**
+ * Returns a {@code BinaryTupleSchema} that corresponds to the index configuration.
+ */
+ public BinaryTupleSchema binaryTupleSchema() {
+ return binaryTupleSchema;
+ }
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index 2919759202..6e615b3729 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.storage.index;
-import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.util.Cursor;
import org.intellij.lang.annotations.MagicConstant;
@@ -74,8 +74,8 @@ public interface SortedIndexStorage {
* @throws IllegalArgumentException If backwards flag is passed and backwards iteration is not supported by the storage.
*/
Cursor<IndexRow> scan(
- @Nullable BinaryTuple lowerBound,
- @Nullable BinaryTuple upperBound,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
@MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
);
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 3aceb2da46..70a1983bf5 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -52,7 +52,7 @@ import org.apache.ignite.configuration.schemas.table.TableIndexView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.SchemaTestUtils;
import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
import org.apache.ignite.internal.schema.testutils.builder.SortedIndexDefinitionBuilder;
@@ -133,17 +133,12 @@ public abstract class AbstractSortedIndexStorageTest {
*
* <p>This method *MUST* always be called in either subclass' constructor or setUp method.
*/
- protected final void initialize(TableConfiguration tableCfg, TablesConfiguration tablesCfg) {
+ protected final void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
this.tablesCfg = tablesCfg;
-
- createTestTable(tableCfg);
- }
-
- /** Set up internal storage implementation. */
- protected final void initializeStorage(MvTableStorage tableStorage) {
this.tableStorage = tableStorage;
-
this.partitionStorage = tableStorage.getOrCreateMvPartition(TEST_PARTITION);
+
+ createTestTable(tableStorage.configuration());
}
/**
@@ -266,7 +261,7 @@ public abstract class AbstractSortedIndexStorageTest {
put(index, row);
put(index, row);
- IndexRow actualRow = getSingle(index, row.indexColumns());
+ IndexRow actualRow = getSingle(index, serializer.serializeRowPrefix(columnValues));
assertThat(actualRow.rowId(), is(equalTo(row.rowId())));
}
@@ -402,8 +397,8 @@ public abstract class AbstractSortedIndexStorageTest {
.limit(lastIndex - firstIndex + 1)
.collect(toList());
- BinaryTuple first = entries.get(firstIndex).prefix(3);
- BinaryTuple last = entries.get(lastIndex).prefix(5);
+ BinaryTuplePrefix first = entries.get(firstIndex).prefix(3);
+ BinaryTuplePrefix last = entries.get(lastIndex).prefix(5);
try (Cursor<IndexRow> cursor = indexStorage.scan(first, last, GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
List<IndexRow> actual = cursor.stream().collect(toList());
@@ -527,7 +522,7 @@ public abstract class AbstractSortedIndexStorageTest {
put(indexStorage, entry1);
put(indexStorage, entry2);
- try (Cursor<IndexRow> cursor = indexStorage.scan(entry2.indexColumns(), entry1.indexColumns(), 0)) {
+ try (Cursor<IndexRow> cursor = indexStorage.scan(entry2.prefix(indexSchema.size()), entry1.prefix(indexSchema.size()), 0)) {
assertThat(cursor.stream().collect(toList()), is(empty()));
}
}
@@ -556,10 +551,10 @@ public abstract class AbstractSortedIndexStorageTest {
entry1 = t;
}
- try (Cursor<IndexRow> cursor = storage.scan(entry1.indexColumns(), entry2.indexColumns(), GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
+ try (Cursor<IndexRow> cursor = storage.scan(entry1.prefix(1), entry2.prefix(1), GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
assertThat(
- cursor.stream().map(IndexRow::indexColumns).collect(toList()),
- contains(entry1.indexColumns(), entry2.indexColumns())
+ cursor.stream().map(row -> row.indexColumns().byteBuffer()).collect(toList()),
+ contains(entry1.indexColumns().byteBuffer(), entry2.indexColumns().byteBuffer())
);
}
}
@@ -611,25 +606,25 @@ public abstract class AbstractSortedIndexStorageTest {
put(indexStorage, entry2);
assertThat(
- getSingle(indexStorage, entry1.indexColumns()).rowId(),
+ getSingle(indexStorage, entry1.prefix(indexSchema.size())).rowId(),
is(equalTo(entry1.rowId()))
);
assertThat(
- getSingle(indexStorage, entry2.indexColumns()).rowId(),
+ getSingle(indexStorage, entry2.prefix(indexSchema.size())).rowId(),
is(equalTo(entry2.rowId()))
);
remove(indexStorage, entry1);
- assertThat(getSingle(indexStorage, entry1.indexColumns()), is(nullValue()));
+ assertThat(getSingle(indexStorage, entry1.prefix(indexSchema.size())), is(nullValue()));
}
/**
* Extracts a single value by a given key or {@code null} if it does not exist.
*/
@Nullable
- private static IndexRow getSingle(SortedIndexStorage indexStorage, BinaryTuple fullPrefix) throws Exception {
+ private static IndexRow getSingle(SortedIndexStorage indexStorage, BinaryTuplePrefix fullPrefix) throws Exception {
try (Cursor<IndexRow> cursor = indexStorage.scan(fullPrefix, fullPrefix, GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
List<IndexRow> values = cursor.stream().collect(toList());
@@ -639,7 +634,7 @@ public abstract class AbstractSortedIndexStorageTest {
}
}
- private static BinaryTuple prefix(SortedIndexStorage index, Object... vals) {
+ private static BinaryTuplePrefix prefix(SortedIndexStorage index, Object... vals) {
var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
return serializer.serializeRowPrefix(vals);
@@ -647,8 +642,8 @@ public abstract class AbstractSortedIndexStorageTest {
private static List<Object[]> scan(
SortedIndexStorage index,
- @Nullable BinaryTuple lowerBound,
- @Nullable BinaryTuple upperBound,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
@MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
) throws Exception {
var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/BinaryTupleComparatorTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/BinaryTupleComparatorTest.java
new file mode 100644
index 0000000000..14e70d71f5
--- /dev/null
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/BinaryTupleComparatorTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for the {@link BinaryTupleComparator} class.
+ */
+public class BinaryTupleComparatorTest {
+
+ @ParameterizedTest
+ @MethodSource("allTypes")
+ public void testCompareSingleColumnTuples(NativeType type) {
+ var columnDescriptor = new ColumnDescriptor("column", type, false, true);
+
+ var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), List.of(columnDescriptor));
+
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ IgniteBiTuple<ByteBuffer, ByteBuffer> tuples = createTestValues(type);
+
+ assertThat(comparator.compare(tuples.get1(), tuples.get2()), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuples.get1(), tuples.get1()), is(0));
+ assertThat(comparator.compare(tuples.get2(), tuples.get2()), is(0));
+ assertThat(comparator.compare(tuples.get2(), tuples.get1()), is(greaterThanOrEqualTo(1)));
+ }
+
+ private static List<NativeType> allTypes() {
+ return List.of(
+ NativeTypes.INT8,
+ NativeTypes.INT16,
+ NativeTypes.INT32,
+ NativeTypes.INT64,
+ NativeTypes.FLOAT,
+ NativeTypes.DOUBLE,
+ NativeTypes.BYTES,
+ NativeTypes.bitmaskOf(42),
+ NativeTypes.decimalOf(20, 3),
+ NativeTypes.UUID,
+ NativeTypes.STRING,
+ NativeTypes.numberOf(20),
+ NativeTypes.timestamp(),
+ NativeTypes.DATE,
+ NativeTypes.datetime()
+ );
+ }
+
+ private static IgniteBiTuple<ByteBuffer, ByteBuffer> createTestValues(NativeType type) {
+ ByteBuffer tuple1;
+ ByteBuffer tuple2;
+
+ switch (type.spec()) {
+ case INT8: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendByte((byte) -1)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendByte(Byte.MAX_VALUE)
+ .build();
+
+ break;
+ }
+
+ case INT16: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendShort((short) -1)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendShort(Short.MAX_VALUE)
+ .build();
+
+ break;
+ }
+
+ case INT32: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendInt(-1)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendInt(Integer.MAX_VALUE)
+ .build();
+
+ break;
+ }
+
+ case INT64: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendLong(-1)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendLong(Long.MAX_VALUE)
+ .build();
+
+ break;
+ }
+
+ case FLOAT: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendFloat(-1.69f)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendFloat(Float.MAX_VALUE)
+ .build();
+
+ break;
+ }
+
+ case DOUBLE: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendDouble(-1.69)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendDouble(Double.MAX_VALUE)
+ .build();
+
+ break;
+ }
+
+ case BYTES: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendBytes(new byte[] {1, 2, 3})
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendBytes(new byte[] {1, 2, 6})
+ .build();
+
+ break;
+ }
+
+ case BITMASK: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendBitmask(BitSet.valueOf(new byte[] { 1, 2, 3 }))
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendBitmask(BitSet.valueOf(new byte[] {-1, -1, -1}))
+ .build();
+
+ break;
+ }
+
+ case DECIMAL: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendDecimal(BigDecimal.valueOf(-1), 4)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendDecimal(BigDecimal.valueOf(123456789.1234), 4)
+ .build();
+
+ break;
+ }
+
+ case UUID: {
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+
+ if (uuid1.compareTo(uuid2) > 0) {
+ UUID t = uuid1;
+ uuid1 = uuid2;
+ uuid2 = t;
+ }
+
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendUuid(uuid1)
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendUuid(uuid2)
+ .build();
+
+ break;
+ }
+
+ case STRING: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendString("foobar")
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendString("foobaz")
+ .build();
+
+ break;
+ }
+
+ case NUMBER: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendNumber(BigInteger.valueOf(-1))
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendNumber(BigInteger.TEN)
+ .build();
+
+ break;
+ }
+
+ case TIMESTAMP: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendTimestamp(Instant.ofEpochSecond(1))
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendTimestamp(Instant.ofEpochSecond(42))
+ .build();
+
+ break;
+ }
+
+ case DATE: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendDate(LocalDate.of(2000, 4, 10))
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendDate(LocalDate.of(2000, 4, 15))
+ .build();
+
+ break;
+ }
+
+ case TIME: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendTime(LocalTime.of(10, 0, 0))
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendTime(LocalTime.of(10, 0, 1))
+ .build();
+
+ break;
+ }
+
+ case DATETIME: {
+ tuple1 = new BinaryTupleBuilder(1, false)
+ .appendDateTime(LocalDateTime.of(2000, 4, 10, 10, 0, 0))
+ .build();
+
+ tuple2 = new BinaryTupleBuilder(1, false)
+ .appendDateTime(LocalDateTime.of(2000, 4, 10, 10, 0, 1))
+ .build();
+
+ break;
+ }
+
+ default:
+ throw new AssertionError(type.toString());
+ }
+
+ return new IgniteBiTuple<>(tuple1, tuple2);
+ }
+
+ @Test
+ public void testCompareMultipleColumnTuples() {
+ List<ColumnDescriptor> columnDescriptors = List.of(
+ new ColumnDescriptor("column", NativeTypes.INT32, false, true),
+ new ColumnDescriptor("column", NativeTypes.STRING, false, false)
+ );
+
+ var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ ByteBuffer tuple1 = new BinaryTupleBuilder(2, false)
+ .appendInt(0)
+ .appendString("foobar")
+ .build();
+
+ ByteBuffer tuple2 = new BinaryTupleBuilder(2, false)
+ .appendInt(1)
+ .appendString("foobar")
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple1, tuple1), is(0));
+ assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+ tuple2 = new BinaryTupleBuilder(2, false)
+ .appendInt(0)
+ .appendString("foobaa")
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+ }
+
+ @Test
+ public void testCompareMultipleColumnTuplesWithNulls() {
+ List<ColumnDescriptor> columnDescriptors = List.of(
+ new ColumnDescriptor("column", NativeTypes.INT32, true, true),
+ new ColumnDescriptor("column", NativeTypes.STRING, true, false)
+ );
+
+ var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ ByteBuffer tuple1 = new BinaryTupleBuilder(2, true)
+ .appendInt(null)
+ .appendString("foobar")
+ .build();
+
+ ByteBuffer tuple2 = new BinaryTupleBuilder(2, true)
+ .appendInt(1)
+ .appendString("foobar")
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple1, tuple1), is(0));
+ assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+ tuple2 = new BinaryTupleBuilder(2, true)
+ .appendInt(null)
+ .appendString("foobaa")
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+ tuple2 = new BinaryTupleBuilder(2, true)
+ .appendInt(null)
+ .appendString(null)
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+ assertThat(comparator.compare(tuple2, tuple2), is(0));
+ }
+
+ @Test
+ public void testCompareWithPrefix() {
+ List<ColumnDescriptor> columnDescriptors = List.of(
+ new ColumnDescriptor("column", NativeTypes.INT32, false, true),
+ new ColumnDescriptor("column", NativeTypes.STRING, false, false)
+ );
+
+ var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ ByteBuffer tuple1 = new BinaryTupleBuilder(2, false)
+ .appendInt(1)
+ .appendString("foobar")
+ .build();
+
+ ByteBuffer tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+ .appendInt(2)
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+ tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+ .appendInt(0)
+ .build();
+
+ assertThat(comparator.compare(tuple2, tuple1), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple1, tuple2), is(greaterThanOrEqualTo(1)));
+
+ tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+ .appendInt(1)
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(0));
+ }
+
+ @Test
+ public void testCompareWithPrefixWithNulls() {
+ List<ColumnDescriptor> columnDescriptors = List.of(
+ new ColumnDescriptor("column", NativeTypes.INT32, true, true),
+ new ColumnDescriptor("column", NativeTypes.STRING, false, false)
+ );
+
+ var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ ByteBuffer tuple1 = new BinaryTupleBuilder(2, true)
+ .appendInt(null)
+ .appendString("foobar")
+ .build();
+
+ ByteBuffer tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+ .appendInt(0)
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+ assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+ tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+ .appendInt(null)
+ .build();
+
+ assertThat(comparator.compare(tuple1, tuple2), is(0));
+ }
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
index 46589ded86..fcb2b3a4bb 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
@@ -66,8 +66,8 @@ public class TestSortedIndexStorageTest extends AbstractSortedIndexStorageTest {
EntryCountBudgetConfigurationSchema.class
})
TablesConfiguration tablesConfig) {
- initialize(tableCfg, tablesConfig);
+ var storage = new TestConcurrentHashMapMvTableStorage(tableCfg, tablesConfig);
- initializeStorage(new TestConcurrentHashMapMvTableStorage(tableCfg, tablesConfig));
+ initialize(storage, tablesConfig);
}
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
index 530db7c811..6c5951e6b2 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
@@ -29,6 +29,7 @@ import java.util.Random;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.SchemaTestUtils;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexRow;
@@ -81,7 +82,7 @@ public class TestIndexRow implements IndexRow, Comparable<TestIndexRow> {
/**
* Creates an Index Key prefix of the given length.
*/
- public BinaryTuple prefix(int length) {
+ public BinaryTuplePrefix prefix(int length) {
return serializer.serializeRowPrefix(Arrays.copyOf(columns, length));
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index e8deddc46f..2642701dee 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -191,13 +191,17 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
*/
@Test
public void testDestroyIndex() {
- tableStorage.getOrCreateMvPartition(PARTITION_ID);
+ MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
assertThat(tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()), is(notNullValue()));
assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()), is(notNullValue()));
- assertThat(tableStorage.destroyIndex(sortedIdx.id()), willCompleteSuccessfully());
- assertThat(tableStorage.destroyIndex(hashIdx.id()), willCompleteSuccessfully());
+ CompletableFuture<Void> destroySortedIndexFuture = tableStorage.destroyIndex(sortedIdx.id());
+ CompletableFuture<Void> destroyHashIndexFuture = tableStorage.destroyIndex(hashIdx.id());
+
+ assertThat(partitionStorage.flush(), willCompleteSuccessfully());
+ assertThat(destroySortedIndexFuture, willCompleteSuccessfully());
+ assertThat(destroyHashIndexFuture, willCompleteSuccessfully());
}
@Test
@@ -223,7 +227,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
new Element(NativeTypes.INT32, false)
});
- ByteBuffer buffer = BinaryTupleBuilder.create(schema.elementCount(), schema.hasNullableElements())
+ ByteBuffer buffer = new BinaryTupleBuilder(schema.elementCount(), schema.hasNullableElements())
.appendInt(1)
.appendInt(2)
.build();
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
index e248511584..09d966abba 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
@@ -64,7 +64,12 @@ public abstract class AbstractHashIndexStorageTest {
private BinaryTupleRowSerializer serializer;
- protected void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
+ /**
+ * Initializes the internal structures needed for tests.
+ *
+ * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+ */
+ protected final void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
this.tableStorage = tableStorage;
createTestTable(tableStorage.configuration());
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
index 798d70bd22..9be9372135 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
@@ -29,7 +29,9 @@ import java.util.BitSet;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.InvalidTypeException;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.schema.SchemaMismatchException;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
/**
@@ -58,22 +61,34 @@ public class BinaryTupleRowSerializer {
private final List<ColumnDescriptor> schema;
+ private final BinaryTupleSchema tupleSchema;
+
/**
* Creates a new instance for a Sorted Index.
*/
public BinaryTupleRowSerializer(SortedIndexDescriptor descriptor) {
- this.schema = descriptor.indexColumns().stream()
+ this(descriptor.indexColumns().stream()
.map(colDesc -> new ColumnDescriptor(colDesc.type(), colDesc.nullable()))
- .collect(toUnmodifiableList());
+ .collect(toUnmodifiableList()));
}
/**
* Creates a new instance for a Hash Index.
*/
public BinaryTupleRowSerializer(HashIndexDescriptor descriptor) {
- this.schema = descriptor.indexColumns().stream()
+ this(descriptor.indexColumns().stream()
.map(colDesc -> new ColumnDescriptor(colDesc.type(), colDesc.nullable()))
- .collect(toUnmodifiableList());
+ .collect(toUnmodifiableList()));
+ }
+
+ private BinaryTupleRowSerializer(List<ColumnDescriptor> schema) {
+ this.schema = schema;
+
+ Element[] elements = schema.stream()
+ .map(columnDescriptor -> new Element(columnDescriptor.type, columnDescriptor.nullable))
+ .toArray(Element[]::new);
+
+ tupleSchema = BinaryTupleSchema.create(elements);
}
/**
@@ -88,13 +103,21 @@ public class BinaryTupleRowSerializer {
));
}
- return new IndexRowImpl(serializeRowPrefix(columnValues), rowId);
+ var builder = new BinaryTupleBuilder(tupleSchema.elementCount(), tupleSchema.hasNullableElements());
+
+ for (Object value : columnValues) {
+ appendValue(builder, value);
+ }
+
+ var tuple = new BinaryTuple(tupleSchema, builder.build());
+
+ return new IndexRowImpl(tuple, rowId);
}
/**
* Creates a prefix of an {@link IndexRow} using the provided columns.
*/
- public BinaryTuple serializeRowPrefix(Object[] prefixColumnValues) {
+ public BinaryTuplePrefix serializeRowPrefix(Object[] prefixColumnValues) {
if (prefixColumnValues.length > schema.size()) {
throw new IllegalArgumentException(String.format(
"Incorrect number of column values passed. Expected not more than %d, got %d",
@@ -103,21 +126,13 @@ public class BinaryTupleRowSerializer {
));
}
- Element[] prefixElements = schema.stream()
- .limit(prefixColumnValues.length)
- .map(columnDescriptor -> new Element(columnDescriptor.type, columnDescriptor.nullable))
- .toArray(Element[]::new);
-
- BinaryTupleSchema prefixSchema = BinaryTupleSchema.create(prefixElements);
-
- BinaryTupleBuilder builder = BinaryTupleBuilder.create(
- prefixSchema.elementCount(), prefixSchema.hasNullableElements());
+ var builder = new BinaryTuplePrefixBuilder(prefixColumnValues.length, schema.size());
for (Object value : prefixColumnValues) {
- appendValue(builder, prefixSchema, value);
+ appendValue(builder, value);
}
- return new BinaryTuple(prefixSchema, builder.build());
+ return new BinaryTuplePrefix(tupleSchema, builder.build());
}
/**
@@ -143,12 +158,11 @@ public class BinaryTupleRowSerializer {
* Append a value for the current element.
*
* @param builder Builder.
- * @param schema Tuple schema.
* @param value Element value.
* @return Builder for chaining.
*/
- private static BinaryTupleBuilder appendValue(BinaryTupleBuilder builder, BinaryTupleSchema schema, Object value) {
- Element element = schema.element(builder.elementIndex());
+ private BinaryTupleBuilder appendValue(BinaryTupleBuilder builder, Object value) {
+ Element element = tupleSchema.element(builder.elementIndex());
if (value == null) {
if (!element.nullable()) {
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/IndexRowImpl.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/IndexRowImpl.java
deleted file mode 100644
index db8be5b72a..0000000000
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/IndexRowImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.index.impl;
-
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.index.IndexRow;
-
-/**
- * {@link IndexRow} implementation that simply stores the provided parameters.
- */
-class IndexRowImpl implements IndexRow {
- private final BinaryTuple indexColumns;
-
- private final RowId rowId;
-
- IndexRowImpl(BinaryTuple indexColumns, RowId rowId) {
- this.indexColumns = indexColumns;
- this.rowId = rowId;
- }
-
- @Override
- public BinaryTuple indexColumns() {
- return indexColumns;
- }
-
- @Override
- public RowId rowId() {
- return rowId;
- }
-}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index d054204b5a..f0c45d67f2 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -19,16 +19,22 @@ package org.apache.ignite.internal.storage.index.impl;
import static org.apache.ignite.internal.util.IgniteUtils.capacity;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.NavigableMap;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.ToIntFunction;
+import java.util.stream.Stream;
import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.util.Cursor;
@@ -38,7 +44,9 @@ import org.jetbrains.annotations.Nullable;
* Test implementation of MV sorted index storage.
*/
public class TestSortedIndexStorage implements SortedIndexStorage {
- private final ConcurrentNavigableMap<BinaryTuple, Set<RowId>> index;
+ private final ConcurrentNavigableMap<ByteBuffer, Set<RowId>> index;
+
+ private final Comparator<ByteBuffer> binaryTupleComparator;
private final SortedIndexDescriptor descriptor;
@@ -47,7 +55,8 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
*/
public TestSortedIndexStorage(SortedIndexDescriptor descriptor) {
this.descriptor = descriptor;
- this.index = new ConcurrentSkipListMap<>(BinaryTupleComparator.newComparator(descriptor));
+ this.binaryTupleComparator = new BinaryTupleComparator(descriptor);
+ this.index = new ConcurrentSkipListMap<>(binaryTupleComparator);
}
@Override
@@ -57,7 +66,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
@Override
public void put(IndexRow row) {
- index.compute(row.indexColumns(), (k, v) -> {
+ index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
if (v == null) {
return Set.of(row.rowId());
} else if (v.contains(row.rowId())) {
@@ -75,7 +84,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
@Override
public void remove(IndexRow row) {
- index.computeIfPresent(row.indexColumns(), (k, v) -> {
+ index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
if (v.contains(row.rowId())) {
if (v.size() == 1) {
return null;
@@ -95,31 +104,45 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
/** {@inheritDoc} */
@Override
public Cursor<IndexRow> scan(
- @Nullable BinaryTuple lowerBound,
- @Nullable BinaryTuple upperBound,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
int flags
) {
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
- NavigableMap<BinaryTuple, Set<RowId>> index = this.index;
- int direction = 1;
+ Stream<Map.Entry<ByteBuffer, Set<RowId>>> data = index.entrySet().stream();
+
+ if (lowerBound != null) {
+ ToIntFunction<ByteBuffer> lowerCmp = boundComparator(lowerBound, includeLower ? 0 : -1);
+
+ data = data.dropWhile(e -> lowerCmp.applyAsInt(e.getKey()) < 0);
+ }
- ToIntFunction<BinaryTuple> lowerCmp = lowerBound == null ? row -> 1 : boundComparator(lowerBound, direction, includeLower ? 0 : -1);
- ToIntFunction<BinaryTuple> upperCmp = upperBound == null ? row -> -1 : boundComparator(upperBound, direction, includeUpper ? 0 : 1);
+ if (upperBound != null) {
+ ToIntFunction<ByteBuffer> upperCmp = boundComparator(upperBound, includeUpper ? 0 : 1);
- Iterator<? extends IndexRow> iterator = index.entrySet().stream()
- .dropWhile(e -> lowerCmp.applyAsInt(e.getKey()) < 0)
- .takeWhile(e -> upperCmp.applyAsInt(e.getKey()) <= 0)
- .flatMap(e -> e.getValue().stream().map(rowId -> new IndexRowImpl(e.getKey(), rowId)))
+ data = data.takeWhile(e -> upperCmp.applyAsInt(e.getKey()) <= 0);
+ }
+
+ Iterator<? extends IndexRow> iterator = data
+ .flatMap(e -> {
+ var tuple = new BinaryTuple(descriptor.binaryTupleSchema(), e.getKey());
+
+ return e.getValue().stream().map(rowId -> new IndexRowImpl(tuple, rowId));
+ })
.iterator();
return Cursor.fromIterator(iterator);
}
- private ToIntFunction<BinaryTuple> boundComparator(BinaryTuple bound, int direction, int equals) {
- BinaryTupleComparator comparator = BinaryTupleComparator.newPrefixComparator(descriptor, bound.count());
+ private ToIntFunction<ByteBuffer> boundComparator(BinaryTuplePrefix bound, int equals) {
+ ByteBuffer boundBuffer = bound.byteBuffer();
+
+ return tuple -> {
+ int compare = binaryTupleComparator.compare(tuple, boundBuffer);
- return tuple -> comparator.compare(tuple, bound, direction, equals);
+ return compare == 0 ? equals : compare;
+ };
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index 6a4e33c0e1..c49a1ebe49 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage.rocksdb;
import java.nio.charset.StandardCharsets;
+import java.util.UUID;
import org.rocksdb.RocksDB;
/**
@@ -39,11 +40,16 @@ class ColumnFamilyUtils {
*/
static final String HASH_INDEX_CF_NAME = "cf-hash";
+ /**
+ * Prefix for SQL indexes column family names.
+ */
+ static final String SORTED_INDEX_CF_PREFIX = "cf-sorted-";
+
/**
* Utility enum to describe a type of the column family - meta or partition.
*/
enum ColumnFamilyType {
- META, PARTITION, HASH_INDEX, UNKNOWN;
+ META, PARTITION, HASH_INDEX, SORTED_INDEX, UNKNOWN;
/**
* Determines column family type by its name.
@@ -58,9 +64,35 @@ class ColumnFamilyUtils {
return PARTITION;
} else if (HASH_INDEX_CF_NAME.equals(cfName)) {
return HASH_INDEX;
+ } else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
+ return SORTED_INDEX;
} else {
return UNKNOWN;
}
}
}
+
+ /**
+ * Creates a column family name by index ID.
+ *
+ * @param indexId Index ID.
+ * @return Column family name.
+ *
+ * @see #sortedIndexId
+ */
+ static String sortedIndexCfName(UUID indexId) {
+ return SORTED_INDEX_CF_PREFIX + indexId;
+ }
+
+ /**
+ * Extracts a Sorted Index ID from the given Column Family name.
+ *
+ * @param cfName Column Family name.
+ * @return Sorted Index ID.
+ *
+ * @see #sortedIndexCfName
+ */
+ static UUID sortedIndexId(String cfName) {
+ return UUID.fromString(cfName.substring(SORTED_INDEX_CF_PREFIX.length()));
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
similarity index 77%
rename from modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index ea6294f6cb..5a61b7c992 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -24,22 +24,34 @@ import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
-class HashIndexes {
+/**
+ * Class that represents a Hash Index defined for all partitions of a Table.
+ */
+class HashIndex {
+ private final ColumnFamily indexCf;
+
private final HashIndexDescriptor descriptor;
private final ConcurrentMap<Integer, HashIndexStorage> storages = new ConcurrentHashMap<>();
- HashIndexes(HashIndexDescriptor descriptor) {
+ HashIndex(ColumnFamily indexCf, HashIndexDescriptor descriptor) {
+ this.indexCf = indexCf;
this.descriptor = descriptor;
}
- HashIndexStorage getOrCreateStorage(ColumnFamily indexCf, RocksDbMvPartitionStorage partitionStorage) {
+ /**
+ * Creates a new Hash Index storage or returns an existing one.
+ */
+ HashIndexStorage getOrCreateStorage(RocksDbMvPartitionStorage partitionStorage) {
return storages.computeIfAbsent(
partitionStorage.partitionId(),
partId -> new RocksDbHashIndexStorage(descriptor, indexCf, partitionStorage)
);
}
+ /**
+ * Removes all data associated with the index.
+ */
void destroy() {
storages.forEach((partitionId, storage) -> storage.destroy());
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
index 370298ca9c..fd3857e348 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
@@ -44,7 +44,7 @@ class RocksDbMetaStorage {
/**
* Name of the key that is out of range of the partition ID key prefix, used as an exclusive bound.
*/
- private static final byte[] PARTITION_ID_PREFIX_END = RocksUtils.rangeEnd(PARTITION_ID_PREFIX);
+ private static final byte[] PARTITION_ID_PREFIX_END = RocksUtils.incrementArray(PARTITION_ID_PREFIX);
private final ColumnFamily metaColumnFamily;
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index bded4516ee..65bae91ba3 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -21,6 +21,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.HASH_INDEX_CF_NAME;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_CF_NAME;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
+import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
+import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexId;
import java.io.IOException;
import java.nio.file.Files;
@@ -47,8 +49,10 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.ColumnFamilyType;
+import org.apache.ignite.internal.storage.rocksdb.index.RocksDbBinaryTupleComparator;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -108,10 +112,13 @@ public class RocksDbTableStorage implements MvTableStorage {
private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> partitions;
/** Hash Index storages by Index IDs. */
- private final ConcurrentMap<UUID, HashIndexes> hashIndices = new ConcurrentHashMap<>();
+ private final ConcurrentMap<UUID, HashIndex> hashIndices = new ConcurrentHashMap<>();
+
+ /** Sorted Index storages by Index IDs. */
+ private final ConcurrentMap<UUID, SortedIndex> sortedIndices = new ConcurrentHashMap<>();
/** Busy lock to stop synchronously. */
- final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
@@ -224,6 +231,15 @@ public class RocksDbTableStorage implements MvTableStorage {
break;
+ case SORTED_INDEX:
+ UUID indexId = sortedIndexId(cf.name());
+
+ var indexDescriptor = new SortedIndexDescriptor(indexId, tableCfg.value(), tablesCfg.value());
+
+ sortedIndices.put(indexId, new SortedIndex(cf, indexDescriptor));
+
+ break;
+
default:
throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
+ tableCfg.value().name() + ']');
@@ -301,6 +317,7 @@ public class RocksDbTableStorage implements MvTableStorage {
resources.add(meta.columnFamily().handle());
resources.add(partitionCf.handle());
resources.add(hashIndexCf.handle());
+ resources.addAll(sortedIndices.values());
resources.add(db);
@@ -385,38 +402,71 @@ public class RocksDbTableStorage implements MvTableStorage {
/** {@inheritDoc} */
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
- throw new UnsupportedOperationException("Not implemented yet");
+ SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
+
+ RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
+
+ if (partitionStorage == null) {
+ throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+ }
+
+ return storages.getOrCreateStorage(partitionStorage);
+ }
+
+ private SortedIndex createSortedIndex(UUID indexId) {
+ var indexDescriptor = new SortedIndexDescriptor(indexId, tableCfg.value(), tablesCfg.value());
+
+ ColumnFamilyDescriptor cfDescriptor = sortedIndexCfDescriptor(sortedIndexCfName(indexId), indexDescriptor);
+
+ ColumnFamily columnFamily;
+ try {
+ columnFamily = ColumnFamily.create(db, cfDescriptor);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to create new RocksDB column family: " + new String(cfDescriptor.getName(), UTF_8), e);
+ }
+
+ flusher.addColumnFamily(columnFamily.handle());
+
+ return new SortedIndex(columnFamily, indexDescriptor);
}
@Override
public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
- HashIndexes storages = hashIndices.computeIfAbsent(indexId, id -> {
- var indexDescriptor = new HashIndexDescriptor(id, tableCfg.value(), tablesCfg.value());
+ HashIndex storages = hashIndices.computeIfAbsent(indexId, id -> {
+ var indexDescriptor = new HashIndexDescriptor(indexId, tableCfg.value(), tablesCfg.value());
- return new HashIndexes(indexDescriptor);
+ return new HashIndex(hashIndexCf, indexDescriptor);
});
RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
if (partitionStorage == null) {
- throw new StorageException(String.format("Partition %d has not been created yet", partitionId));
+ throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
}
- return storages.getOrCreateStorage(hashIndexCf, partitionStorage);
+ return storages.getOrCreateStorage(partitionStorage);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> destroyIndex(UUID indexId) {
- HashIndexes storages = hashIndices.remove(indexId);
+ HashIndex hashIdx = hashIndices.remove(indexId);
- if (storages == null) {
- return CompletableFuture.completedFuture(null);
+ if (hashIdx != null) {
+ hashIdx.destroy();
}
- storages.destroy();
+ SortedIndex sortedIdx = sortedIndices.remove(indexId);
- return awaitFlush(false);
+ if (sortedIdx != null) {
+ sortedIdx.destroy();
+ }
+
+ if (hashIdx == null && sortedIdx == null) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return awaitFlush(false);
+ }
}
/** {@inheritDoc} */
@@ -495,8 +545,24 @@ public class RocksDbTableStorage implements MvTableStorage {
new ColumnFamilyOptions().useFixedLengthPrefixExtractor(RocksDbHashIndexStorage.FIXED_PREFIX_LENGTH)
);
+ case SORTED_INDEX:
+ var indexDescriptor = new SortedIndexDescriptor(sortedIndexId(cfName), tableCfg.value(), tablesCfg.value());
+
+ return sortedIndexCfDescriptor(cfName, indexDescriptor);
+
default:
throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.value().name() + ']');
}
}
+
+ /**
+ * Creates a Column Family descriptor for a Sorted Index.
+ */
+ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, SortedIndexDescriptor descriptor) {
+ var comparator = new RocksDbBinaryTupleComparator(descriptor);
+
+ ColumnFamilyOptions options = new ColumnFamilyOptions().setComparator(comparator);
+
+ return new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), options);
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
new file mode 100644
index 0000000000..f78447374e
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.rocksdb.RocksDBException;
+
+/**
+ * Class that represents a Sorted Index defined for all partitions of a Table.
+ */
+class SortedIndex implements AutoCloseable {
+ private final SortedIndexDescriptor descriptor;
+
+ private final ColumnFamily indexCf;
+
+ private final ConcurrentMap<Integer, SortedIndexStorage> storages = new ConcurrentHashMap<>();
+
+ SortedIndex(ColumnFamily indexCf, SortedIndexDescriptor descriptor) {
+ this.descriptor = descriptor;
+ this.indexCf = indexCf;
+ }
+
+ /**
+ * Creates a new Sorted Index storage or returns an existing one.
+ */
+ SortedIndexStorage getOrCreateStorage(RocksDbMvPartitionStorage partitionStorage) {
+ return storages.computeIfAbsent(
+ partitionStorage.partitionId(),
+ partId -> new RocksDbSortedIndexStorage(descriptor, indexCf, partitionStorage)
+ );
+ }
+
+ /**
+ * Removes all data associated with the index.
+ */
+ void destroy() {
+ try {
+ indexCf.destroy();
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to destroy index " + descriptor.id(), e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ indexCf.handle().close();
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtils.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtils.java
new file mode 100644
index 0000000000..050023d63d
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtils.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utility class for working with cursors.
+ */
+class CursorUtils {
+ /**
+ * Cursor wrapper that discards elements while they match a given predicate. As soon as any element does not match the predicate,
+ * no more elements will be discarded.
+ *
+ * @param <T> Cursor element type.
+ */
+ private static class DropWhileCursor<T> implements Cursor<T> {
+ private final Cursor<T> cursor;
+
+ @Nullable
+ private Predicate<T> predicate;
+
+ @Nullable
+ private T firstNotMatchedElement;
+
+ DropWhileCursor(Cursor<T> cursor, Predicate<T> predicate) {
+ this.cursor = cursor;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public void close() throws Exception {
+ cursor.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (predicate == null) {
+ return firstNotMatchedElement != null || cursor.hasNext();
+ }
+
+ while (cursor.hasNext()) {
+ firstNotMatchedElement = cursor.next();
+
+ if (!predicate.test(firstNotMatchedElement)) {
+ predicate = null;
+
+ break;
+ }
+ }
+
+ return predicate == null;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ if (firstNotMatchedElement != null) {
+ T next = firstNotMatchedElement;
+
+ firstNotMatchedElement = null;
+
+ return next;
+ } else {
+ return cursor.next();
+ }
+ }
+ }
+
+ /**
+ * Creates a cursor wrapper that discards elements while they match a given predicate. As soon as any element does not match the
+ * predicate, no more elements will be discarded.
+ *
+ * @param cursor Underlying cursor with data.
+ * @param predicate Predicate for elements to be discarded.
+ * @param <T> Cursor element type.
+ * @return Cursor wrapper.
+ */
+ static <T> Cursor<T> dropWhile(Cursor<T> cursor, Predicate<T> predicate) {
+ return new DropWhileCursor<>(cursor, predicate);
+ }
+
+ /**
+ * Cursor wrapper that discards elements if they don't match a given predicate. As soon as any element does not match the predicate,
+ * all following elements will be discarded.
+ *
+ * @param <T> Cursor element type.
+ */
+ private static class TakeWhileCursor<T> implements Cursor<T> {
+ private final Cursor<T> cursor;
+
+ @Nullable
+ private Predicate<T> predicate;
+
+ @Nullable
+ private T next;
+
+ TakeWhileCursor(Cursor<T> cursor, Predicate<T> predicate) {
+ this.cursor = cursor;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public void close() throws Exception {
+ cursor.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null) {
+ return true;
+ } else if (predicate == null || !cursor.hasNext()) {
+ return false;
+ }
+
+ next = cursor.next();
+
+ if (predicate.test(next)) {
+ return true;
+ } else {
+ predicate = null;
+ next = null;
+
+ return false;
+ }
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ T result = next;
+
+ next = null;
+
+ return result;
+ }
+ }
+
+ /**
+ * Creates a cursor wrapper that discards elements if they don't match a given predicate. As soon as any element does not match the
+ * predicate, all following elements will be discarded.
+ *
+ * @param cursor Underlying cursor with data.
+ * @param predicate Predicate for elements to be kept.
+ * @param <T> Cursor element type.
+ * @return Cursor wrapper.
+ */
+ static <T> Cursor<T> takeWhile(Cursor<T> cursor, Predicate<T> predicate) {
+ return new TakeWhileCursor<>(cursor, predicate);
+ }
+
+ /**
+ * Cursor wrapper that transforms the underlying cursor's data using the provided mapping function.
+ *
+ * @param <T> Type of the original data.
+ * @param <U> Type of the transformed data.
+ */
+ private static class MapCursor<T, U> implements Cursor<U> {
+ private final Cursor<T> cursor;
+
+ private final Function<T, U> mapper;
+
+ MapCursor(Cursor<T> cursor, Function<T, U> mapper) {
+ this.cursor = cursor;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void close() throws Exception {
+ cursor.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cursor.hasNext();
+ }
+
+ @Override
+ public U next() {
+ return mapper.apply(cursor.next());
+ }
+ }
+
+ /**
+ * Creates a cursor wrapper that transforms the underlying cursor's data using the provided mapping function.
+ *
+ * @param cursor Underlying cursor with data.
+ * @param mapper Function to transform the elements of the underlying cursor.
+ * @param <T> Type of the original data.
+ * @param <U> Type of the transformed data.
+ * @return Cursor wrapper.
+ */
+ static <T, U> Cursor<U> map(Cursor<T> cursor, Function<T, U> mapper) {
+ return new MapCursor<>(cursor, mapper);
+ }
+
+ /**
+ * Creates a cursor that iterates over both given cursors.
+ *
+ * @param a First cursor.
+ * @param b Second cursor.
+ * @param <T> Cursor element type.
+ * @return Cursor that iterates over both given cursors.
+ */
+ static <T> Cursor<T> concat(Cursor<T> a, Cursor<T> b) {
+ return new Cursor<>() {
+ private Cursor<T> currentCursor = a;
+
+ @Override
+ public void close() throws Exception {
+ IgniteUtils.closeAll(a, b);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentCursor.hasNext()) {
+ return true;
+ } else if (currentCursor == b) {
+ return false;
+ } else {
+ currentCursor = b;
+
+ return currentCursor.hasNext();
+ }
+ }
+
+ @Override
+ public T next() {
+ return currentCursor.next();
+ }
+ };
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
new file mode 100644
index 0000000000..36a65b6f84
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+
+/**
+ * {@link AbstractComparator} implementation that compares Binary Tuples.
+ */
+public class RocksDbBinaryTupleComparator extends AbstractComparator {
+ private final BinaryTupleComparator comparator;
+
+ /** Options needed for resource management. */
+ private final ComparatorOptions options;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Sorted Index descriptor.
+ */
+ public RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor) {
+ this(descriptor, new ComparatorOptions());
+ }
+
+ private RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor, ComparatorOptions options) {
+ super(options);
+
+ this.options = options;
+ this.comparator = new BinaryTupleComparator(descriptor);
+ }
+
+ @Override
+ public String name() {
+ return getClass().getCanonicalName();
+ }
+
+ @Override
+ public int compare(ByteBuffer a, ByteBuffer b) {
+ int comparePartitionId = Short.compare(a.getShort(), b.getShort());
+
+ if (comparePartitionId != 0) {
+ return comparePartitionId;
+ }
+
+ ByteBuffer firstBinaryTupleBuffer = a.slice().order(ByteOrder.LITTLE_ENDIAN);
+ ByteBuffer secondBinaryTupleBuffer = b.slice().order(ByteOrder.LITTLE_ENDIAN);
+
+ int compareTuples = comparator.compare(firstBinaryTupleBuffer, secondBinaryTupleBuffer);
+
+ // Binary Tuple Prefixes don't have row IDs, so they can't be compared.
+ if (compareTuples != 0 || isPrefix(firstBinaryTupleBuffer) || isPrefix(secondBinaryTupleBuffer)) {
+ return compareTuples;
+ }
+
+ return compareRowIds(a, b);
+ }
+
+ private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
+ long firstMostSignBits = a.getLong(a.limit() - Long.BYTES * 2);
+ long secondMostSignBits = b.getLong(b.limit() - Long.BYTES * 2);
+
+ int compare = Long.compare(firstMostSignBits, secondMostSignBits);
+
+ if (compare != 0) {
+ return compare;
+ }
+
+ long firstLeastSignBits = a.getLong(a.limit() - Long.BYTES);
+ long secondLeastSignBits = b.getLong(b.limit() - Long.BYTES);
+
+ return Long.compare(firstLeastSignBits, secondLeastSignBits);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ options.close();
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index ee204fc9cf..bdc06a8a66 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.storage.rocksdb.index;
-import static org.apache.ignite.internal.rocksdb.RocksUtils.rangeEnd;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementArray;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
@@ -109,7 +109,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
public Cursor<RowId> get(BinaryTuple key) {
byte[] rangeStart = rocksPrefix(key);
- byte[] rangeEnd = rangeEnd(rangeStart);
+ byte[] rangeEnd = incrementArray(rangeStart);
Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
@@ -162,7 +162,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
@Override
public void destroy() {
- byte[] rangeEnd = rangeEnd(constantPrefix);
+ byte[] rangeEnd = incrementArray(constantPrefix);
assert rangeEnd != null;
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
new file mode 100644
index 0000000000..36321b8b6c
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.concat;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.dropWhile;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.map;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.takeWhile;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * {@link SortedIndexStorage} implementation based on RocksDB.
+ *
+ * <p>This storage uses the following format for keys:
+ * <pre>
+ * Partition ID - 2 bytes
+ * Tuple value - variable length
+ * Row ID (UUID) - 16 bytes
+ * </pre>
+ *
+ * <p>We use an empty array as values, because all required information can be extracted from the key.
+ */
+public class RocksDbSortedIndexStorage implements SortedIndexStorage {
+ private final SortedIndexDescriptor descriptor;
+
+ private final ColumnFamily indexCf;
+
+ private final RocksDbMvPartitionStorage partitionStorage;
+
+ /**
+ * Creates a storage.
+ *
+ * @param descriptor Sorted Index descriptor.
+ * @param indexCf Column family that stores the index data.
+ * @param partitionStorage Partition storage of the corresponding index.
+ */
+ public RocksDbSortedIndexStorage(
+ SortedIndexDescriptor descriptor,
+ ColumnFamily indexCf,
+ RocksDbMvPartitionStorage partitionStorage
+ ) {
+ this.descriptor = descriptor;
+ this.indexCf = indexCf;
+ this.partitionStorage = partitionStorage;
+ }
+
+ @Override
+ public SortedIndexDescriptor indexDescriptor() {
+ return descriptor;
+ }
+
+ @Override
+ public void put(IndexRow row) {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+ try {
+ writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to insert data into sorted index. Index ID: " + descriptor.id(), e);
+ }
+ }
+
+ @Override
+ public void remove(IndexRow row) {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+ try {
+ writeBatch.delete(indexCf.handle(), rocksKey(row));
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to remove data from sorted index. Index ID: " + descriptor.id(), e);
+ }
+ }
+
+ @Override
+ public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+ return scan(lowerBound, upperBound, includeLower, includeUpper);
+ }
+
+ private Cursor<IndexRow> scan(
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ boolean includeLower,
+ boolean includeUpper
+ ) {
+ byte[] lowerBoundBytes = lowerBound == null ? null : rocksPrefix(lowerBound);
+ byte[] upperBoundBytes = upperBound == null ? null : rocksPrefix(upperBound);
+
+ Cursor<ByteBuffer> cursor = createScanCursor(lowerBoundBytes, upperBoundBytes);
+
+ // Skip the lower bound, if needed (RocksDB includes the lower bound by default).
+ if (!includeLower && lowerBound != null) {
+ cursor = dropWhile(cursor, startsWith(lowerBound));
+ }
+
+ // Include the upper bound, if needed (RocksDB excludes the upper bound by default).
+ if (includeUpper && upperBound != null) {
+ Cursor<ByteBuffer> upperBoundCursor = takeWhile(createScanCursor(upperBoundBytes, null), startsWith(upperBound));
+
+ cursor = concat(cursor, upperBoundCursor);
+ }
+
+ return map(cursor, this::decodeRow);
+ }
+
+ private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound, byte @Nullable [] upperBound) {
+ Slice upperBoundSlice = upperBound == null ? null : new Slice(upperBound);
+
+ ReadOptions options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+
+ RocksIterator it = indexCf.newIterator(options);
+
+ if (lowerBound == null) {
+ it.seekToFirst();
+ } else {
+ it.seek(lowerBound);
+ }
+
+ return new RocksIteratorAdapter<>(it) {
+ @Override
+ protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
+ return ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ IgniteUtils.closeAll(options, upperBoundSlice);
+ }
+ };
+ }
+
+ private IndexRow decodeRow(ByteBuffer bytes) {
+ assert bytes.getShort(0) == partitionStorage.partitionId();
+
+ var tuple = new BinaryTuple(descriptor.binaryTupleSchema(), binaryTupleSlice(bytes));
+
+ // RowId UUID is located at the last 16 bytes of the key
+ long mostSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES * 2);
+ long leastSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES);
+
+ var rowId = new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
+
+ return new IndexRowImpl(tuple, rowId);
+ }
+
+ private byte[] rocksPrefix(BinaryTuplePrefix prefix) {
+ return rocksPrefix(prefix, 0).array();
+ }
+
+ private ByteBuffer rocksPrefix(InternalTuple prefix, int extraLength) {
+ ByteBuffer keyBytes = prefix.byteBuffer();
+
+ return ByteBuffer.allocate(Short.BYTES + keyBytes.remaining() + extraLength)
+ .order(ByteOrder.BIG_ENDIAN)
+ .putShort((short) partitionStorage.partitionId())
+ .put(keyBytes);
+ }
+
+ private byte[] rocksKey(IndexRow row) {
+ RowId rowId = row.rowId();
+
+ // We don't store the Partition ID as it is already a part of the key.
+ return rocksPrefix(row.indexColumns(), 2 * Long.BYTES)
+ .putLong(rowId.mostSignificantBits())
+ .putLong(rowId.leastSignificantBits())
+ .array();
+ }
+
+ private Predicate<ByteBuffer> startsWith(BinaryTuplePrefix prefix) {
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ return key -> {
+ // First, compare the partitionIDs.
+ boolean partitionIdCompare = key.getShort(0) == partitionStorage.partitionId();
+
+ if (!partitionIdCompare) {
+ return false;
+ }
+
+ // Finally, compare the remaining parts of the tuples.
+ // TODO: This part may be optimized by comparing binary tuple representations directly. However, currently BinaryTuple prefixes
+ // are not binary compatible with regular tuples. See https://issues.apache.org/jira/browse/IGNITE-17711.
+ return comparator.compare(prefix.byteBuffer(), binaryTupleSlice(key)) == 0;
+ };
+ }
+
+ private static ByteBuffer binaryTupleSlice(ByteBuffer key) {
+ return key.duplicate()
+ // Discard partition ID.
+ .position(Short.BYTES)
+ // Discard row ID.
+ .limit(key.limit() - Long.BYTES * 2)
+ .slice()
+ .order(ByteOrder.LITTLE_ENDIAN);
+ }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 467d59a703..2d247e6afe 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -163,22 +162,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
void storageAdvertisesItIsPersistent() {
assertThat(tableStorage.isVolatile(), is(false));
}
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
- @Override
- public void testCreateSortedIndex() {
- super.testCreateSortedIndex();
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
- @Override
- public void testDestroyIndex() {
- super.testDestroyIndex();
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
- @Override
- public void testMisconfiguredIndices() {
- super.testMisconfiguredIndices();
- }
}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtilsTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtilsTest.java
new file mode 100644
index 0000000000..9e5c6be715
--- /dev/null
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtilsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.concat;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.dropWhile;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.map;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.takeWhile;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class for testing {@link CursorUtils}.
+ */
+public class CursorUtilsTest {
+ @Test
+ public void testDropWhile() {
+ Cursor<Integer> actual = dropWhile(cursor(1, 2, 5, 14, 20, 1), i -> i < 10);
+
+ assertThat(actual, contains(14, 20, 1));
+
+ assertThat(dropWhile(cursor(), i -> i.hashCode() < 10), is(emptyIterable()));
+ }
+
+ @Test
+ public void testTakeWhile() {
+ Cursor<Integer> actual = takeWhile(cursor(1, 2, 5, 14, 20, 1), i -> i < 10);
+
+ assertThat(actual, contains(1, 2, 5));
+
+ assertThat(takeWhile(cursor(), i -> i.hashCode() < 10), is(emptyIterable()));
+ }
+
+ @Test
+ public void testMap() {
+ Cursor<String> actual = map(cursor(1, 2, 5, 14, 20), i -> "foo" + i);
+
+ assertThat(actual, contains("foo1", "foo2", "foo5", "foo14", "foo20"));
+
+ assertThat(map(cursor(), Object::toString), is(emptyIterable()));
+ }
+
+ @Test
+ public void testConcat() {
+ Cursor<Integer> actual = concat(cursor(1, 2, 5), cursor(5, 2, 1));
+
+ assertThat(actual, contains(1, 2, 5, 5, 2, 1));
+
+ assertThat(concat(cursor(), cursor()), is(emptyIterable()));
+ }
+
+ @Test
+ public void testCombination() {
+ Cursor<Integer> dropWhile = dropWhile(cursor(1, 5, 8, 10, 42), i -> i <= 8);
+
+ Cursor<Integer> takeWhile = takeWhile(dropWhile, i -> i >= 10);
+
+ Cursor<Integer> concat = concat(takeWhile, cursor(1, 2, 3));
+
+ Cursor<String> map = map(concat, String::valueOf);
+
+ assertThat(map, contains("10", "42", "1", "2", "3"));
+ }
+
+ @SafeVarargs
+ private static <T> Cursor<T> cursor(T... elements) {
+ return Cursor.fromIterator(Arrays.asList(elements).iterator());
+ }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
index 2839e05c05..0db33a89b6 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
@@ -19,9 +19,6 @@ package org.apache.ignite.internal.storage.rocksdb.index;
import java.nio.file.Path;
import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -64,17 +61,17 @@ public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest {
UnlimitedBudgetConfigurationSchema.class
},
value = "mock.dataStorage.name = " + RocksDbStorageEngine.ENGINE_NAME
- ) TableConfiguration tableCfg,
-
- @InjectConfiguration(polymorphicExtensions = {
- HashIndexConfigurationSchema.class,
- UnknownDataStorageConfigurationSchema.class,
- ConstantValueDefaultConfigurationSchema.class,
- FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class,
- UnlimitedBudgetConfigurationSchema.class,
- EntryCountBudgetConfigurationSchema.class
- }) TablesConfiguration tablesConfig
+ )
+ TableConfiguration tableCfg,
+ @InjectConfiguration(
+ polymorphicExtensions = {
+ HashIndexConfigurationSchema.class,
+ UnknownDataStorageConfigurationSchema.class,
+ NullValueDefaultConfigurationSchema.class,
+ UnlimitedBudgetConfigurationSchema.class
+ }
+ )
+ TablesConfiguration tablesConfig
) {
engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
similarity index 73%
copy from modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
copy to modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index 2839e05c05..f46c29e5e4 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -19,17 +19,14 @@ package org.apache.ignite.internal.storage.rocksdb.index;
import java.nio.file.Path;
import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.storage.index.AbstractHashIndexStorageTest;
+import org.apache.ignite.internal.storage.index.AbstractSortedIndexStorageTest;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.RocksDbTableStorage;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
@@ -42,11 +39,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
/**
- * Tests for the {@link RocksDbHashIndexStorage} class.
+ * Tests for the {@link RocksDbSortedIndexStorage} class.
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
-public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest {
+public class RocksDbSortedIndexStorageTest extends AbstractSortedIndexStorageTest {
private RocksDbStorageEngine engine;
private RocksDbTableStorage tableStorage;
@@ -59,22 +56,22 @@ public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest {
@InjectConfiguration(
polymorphicExtensions = {
RocksDbDataStorageConfigurationSchema.class,
- HashIndexConfigurationSchema.class,
+ SortedIndexConfigurationSchema.class,
NullValueDefaultConfigurationSchema.class,
UnlimitedBudgetConfigurationSchema.class
},
value = "mock.dataStorage.name = " + RocksDbStorageEngine.ENGINE_NAME
- ) TableConfiguration tableCfg,
-
- @InjectConfiguration(polymorphicExtensions = {
- HashIndexConfigurationSchema.class,
- UnknownDataStorageConfigurationSchema.class,
- ConstantValueDefaultConfigurationSchema.class,
- FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class,
- UnlimitedBudgetConfigurationSchema.class,
- EntryCountBudgetConfigurationSchema.class
- }) TablesConfiguration tablesConfig
+ )
+ TableConfiguration tableCfg,
+ @InjectConfiguration(
+ polymorphicExtensions = {
+ SortedIndexConfigurationSchema.class,
+ UnknownDataStorageConfigurationSchema.class,
+ NullValueDefaultConfigurationSchema.class,
+ UnlimitedBudgetConfigurationSchema.class
+ }
+ )
+ TablesConfiguration tablesConfig
) {
engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);