You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/12/14 11:14:29 UTC

[GitHub] [ignite-3] SammyVimes commented on a change in pull request #468: IGNITE-15885 Implement RocksDB-based Sorted Index Storage

SammyVimes commented on a change in pull request #468:
URL: https://github.com/apache/ignite-3/pull/468#discussion_r768533602



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -52,10 +52,32 @@
      *
      * @param partId Partition id.
      * @throws IllegalArgumentException If partition id is out of bounds.
-     * @throws StorageException If an error has occurred during the partition destruction.
+     * @throws StorageException         If an error has occurred during the partition destruction.
      */
     void dropPartition(int partId) throws StorageException;
 
+    /**
+     * Creates or returns an already created Sorted Index with the given name.
+     *
+     * <p>A prerequisite for calling this method is to have the index already configured under the same name in the Table Configuration

Review comment:
       <p> should have a closing tag

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/BinarySearchRow.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Adapter that converts a {@link BinaryRow} into a {@link SearchRow}.
+ */
+public class BinarySearchRow implements SearchRow {
+    /** Key array. */
+    private final byte[] key;
+
+    /**
+     * The constructor.
+     *
+     * @param row The search row.
+     */
+    public BinarySearchRow(BinaryRow row) {
+        // TODO asch IGNITE-15934 can reuse thread local byte buffer
+        key = new byte[row.keySlice().capacity()];
+
+        row.keySlice().get(key);
+    }
+
+    @Override
+    public byte @NotNull [] keyBytes() {
+        return key;
+    }
+
+    @Override
+    public @NotNull ByteBuffer key() {

Review comment:
       Is this our new codestyle? Oh god, please no........

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -87,53 +73,42 @@
     /** Data region for the table. */
     private final RocksDbDataRegion dataRegion;
 
-    /** Comparators factory for indexes. */
-    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
-
     /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
     private final List<AutoCloseable> autoCloseables = new ArrayList<>();
 
-    /** Rocks DB instance itself. */
+    /** Rocks DB instance. */
     private RocksDB db;
 
     /** CF handle for meta information. */
     @SuppressWarnings("unused")
-    private ColumnFamilyHandle metaCfHandle;
+    private ColumnFamily metaCf;

Review comment:
       Why is it unused tho?

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowDeserializer.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowDeserializer;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
+
+/**
+ * {@link IndexRowDeserializer} implementation that uses {@link BinaryRow} infrastructure for deserialization purposes.
+ */
+class BinaryIndexRowDeserializer implements IndexRowDeserializer {
+    private final SortedIndexDescriptor descriptor;
+
+    BinaryIndexRowDeserializer(SortedIndexDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    @Override
+    public Object[] indexedColumnValues(IndexRow indexRow) {
+        var row = new Row(descriptor.asSchemaDescriptor(), new ByteBufferRow(indexRow.rowBytes()));
+
+        return descriptor.indexRowColumns().stream()

Review comment:
       I wonder how hot this code will be. Should it use streams?

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static java.util.Comparator.comparing;
+import static java.util.Comparator.comparingDouble;
+import static java.util.Comparator.comparingInt;
+import static java.util.Comparator.comparingLong;
+import static org.apache.ignite.internal.storage.rocksdb.index.ComparatorUtils.comparingNull;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Comparator;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+
+/**
+ * Naive RocksDB comparator implementation that fully de-serializes the passed data and compares the index columns one after the other.
+ */
+public class BinaryRowComparator extends AbstractComparator {
+    /**
+     * Actual comparator implementation.
+     */
+    private final Comparator<ByteBuffer> innerComparator;
+
+    /**
+     * Options needed for resource management.
+     */
+    private final ComparatorOptions options;
+
+    /**
+     * Creates a RocksDB comparator for a Sorted Index identified by the given descriptor.
+     */
+    public BinaryRowComparator(SortedIndexDescriptor descriptor) {
+        this(descriptor, new ComparatorOptions());
+    }
+
+    /**
+     * Internal constructor for capturing the {@code options} parameter for resource management purposes.
+     */
+    private BinaryRowComparator(SortedIndexDescriptor descriptor, ComparatorOptions options) {
+        super(options);
+
+        innerComparator = comparing(
+                byteBuffer -> new ByteBufferRow(byteBuffer.order(ByteOrder.LITTLE_ENDIAN)),
+                binaryRowComparator(descriptor)
+        );
+
+        this.options = options;
+    }
+
+    /**
+     * Creates a comparator for comparing two {@link BinaryRow}s by converting them into {@link Row}s.
+     */
+    private static Comparator<BinaryRow> binaryRowComparator(SortedIndexDescriptor descriptor) {
+        return comparing(
+                binaryRow -> new Row(descriptor.asSchemaDescriptor(), binaryRow),
+                rowComparator(descriptor)
+        );
+    }
+
+    /**
+     * Creates a comparator that compares two {@link Row}s by comparing individual columns.
+     */
+    private static Comparator<Row> rowComparator(SortedIndexDescriptor descriptor) {
+        return descriptor.indexRowColumns().stream()
+                .map(columnDescriptor -> {
+                    Column column = columnDescriptor.column();
+
+                    Comparator<Row> columnComparator = columnComparator(column);
+
+                    if (columnDescriptor.nullable()) {
+                        columnComparator = comparingNull(
+                                row -> row.hasNullValue(column.schemaIndex(), column.type().spec()) ? null : row,
+                                columnComparator
+                        );
+                    }
+
+                    return columnDescriptor.asc() ? columnComparator : columnComparator.reversed();
+                })
+                .reduce(Comparator::thenComparing)
+                .orElseThrow();
+    }
+
+    /**
+     * Creates a comparator for comparing table columns.
+     */
+    private static Comparator<Row> columnComparator(Column column) {
+        int schemaIndex = column.schemaIndex();
+
+        NativeTypeSpec typeSpec = column.type().spec();
+
+        switch (typeSpec) {
+            case INT8:
+                return (row1, row2) -> {
+                    byte value1 = row1.byteValue(schemaIndex);
+                    byte value2 = row2.byteValue(schemaIndex);
+
+                    return Byte.compare(value1, value2);
+                };
+
+            case INT16:
+                return (row1, row2) -> {
+                    short value1 = row1.shortValue(schemaIndex);
+                    short value2 = row2.shortValue(schemaIndex);
+
+                    return Short.compare(value1, value2);
+                };
+
+            case INT32:
+                return comparingInt(row -> row.intValue(schemaIndex));
+
+            case INT64:
+                return comparingLong(row -> row.longValue(schemaIndex));
+
+            case FLOAT:
+                return (row1, row2) -> {
+                    float value1 = row1.floatValue(schemaIndex);
+                    float value2 = row2.floatValue(schemaIndex);
+
+                    return Float.compare(value1, value2);
+                };
+
+            case DOUBLE:
+                return comparingDouble(row -> row.doubleValue(schemaIndex));
+
+            case BYTES:
+                return comparing(row -> row.bytesValue(schemaIndex), Arrays::compare);
+
+            case BITMASK:
+                return comparing(row -> row.bitmaskValue(schemaIndex).toLongArray(), Arrays::compare);
+
+            // all other types implement Comparable
+            case DECIMAL:
+            case UUID:
+            case STRING:
+            case NUMBER:
+            case TIMESTAMP:
+            case DATE:
+            case TIME:
+            case DATETIME:
+                return comparing(row -> (Comparable) typeSpec.objectValue(row, schemaIndex));
+
+            default:
+                throw new IllegalArgumentException(String.format(
+                        "Unsupported column schema for creating a sorted index. Column name: %s, column type: %s",
+                        column.name(), column.type()
+                ));
+        }
+    }
+
+    @Override

Review comment:
       inheritDoc here and after

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -52,10 +52,32 @@
      *
      * @param partId Partition id.
      * @throws IllegalArgumentException If partition id is out of bounds.
-     * @throws StorageException If an error has occurred during the partition destruction.
+     * @throws StorageException         If an error has occurred during the partition destruction.
      */
     void dropPartition(int partId) throws StorageException;
 
+    /**
+     * Creates or returns an already created Sorted Index with the given name.
+     *
+     * <p>A prerequisite for calling this method is to have the index already configured under the same name in the Table Configuration
+     * (see {@link #configuration()}).
+     *
+     * @param indexName Index name.
+     * @return Sorted Index storage.
+     * @throws StorageException if no index has been configured under the given name or it has been configured incorrectly (e.g. it was
+     *                          configured as a Hash Index).
+     */
+    SortedIndexStorage getOrCreateSortedIndex(String indexName);
+
+    /**
+     * Destroys the index under the given name and all data in it.
+     *
+     * <p>This method is a no-op if the index under the given name does not exist.

Review comment:
       <p> should have a closing tag

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toUnmodifiableList;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.IndexColumnView;
+import org.apache.ignite.configuration.schemas.table.SortedIndexView;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.schema.definition.ColumnDefinition;
+
+/**
+ * Descriptor for creating a Sorted Index Storage.
+ *
+ * @see SortedIndexStorage
+ */
+public class SortedIndexDescriptor {
+    /**
+     * Descriptor of a Sorted Index column (column name and column sort order).
+     */
+    public static class ColumnDescriptor {
+        private final Column column;
+
+        private final boolean asc;
+
+        private final boolean indexedColumn;
+
+        private final boolean nullable;
+
+        ColumnDescriptor(Column column, boolean asc, boolean indexedColumn, boolean nullable) {
+            this.column = column;
+            this.asc = asc;
+            this.indexedColumn = indexedColumn;
+            this.nullable = nullable;
+        }
+
+        /**
+         * Returns a column descriptor.
+         */
+        public Column column() {
+            return column;
+        }
+
+        /**
+         * Returns {@code true} if this column is sorted in ascending order or {@code false} otherwise.
+         */
+        public boolean asc() {
+            return asc;
+        }
+
+        /**
+         * Returns {@code true} if this column was explicitly marked as an indexed column or {@code false} if it is a part of a Primary Key
+         * appended for uniqueness.
+         */
+        public boolean indexedColumn() {
+            return indexedColumn;
+        }
+
+        /**
+         * Returns {@code true} if this column can contain null values or {@code false} otherwise.
+         */
+        public boolean nullable() {
+            return nullable;
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(this);
+        }
+    }
+
+    private final String name;
+
+    private final List<ColumnDescriptor> columns;
+
+    private final SchemaDescriptor schemaDescriptor;
+
+    /**
+     * Creates an Index Descriptor from a given Table Configuration.
+     *
+     * @param name        index name.
+     * @param tableConfig table configuration.
+     */
+    public SortedIndexDescriptor(String name, TableView tableConfig) {
+        this.name = name;
+
+        TableIndexView indexConfig = tableConfig.indices().get(name);
+
+        if (indexConfig == null) {
+            throw new StorageException(String.format("Index configuration for \"%s\" could not be found", name));
+        }
+
+        if (!(indexConfig instanceof SortedIndexView)) {
+            throw new StorageException(String.format(
+                    "Index \"%s\" is not configured as a Sorted Index. Actual type: %s",
+                    name, indexConfig.type()
+            ));
+        }
+
+        // extract indexed column configurations from the table configuration
+        NamedListView<? extends IndexColumnView> indexColumns = ((SortedIndexView) indexConfig).columns();
+
+        Stream<String> indexColumnNames = indexColumns.namedListKeys().stream();
+
+        // append the primary key to guarantee index key uniqueness
+        Stream<String> primaryKeyColumnNames = Arrays.stream(tableConfig.primaryKey().columns());
+
+        List<ColumnView> indexKeyColumnViews = Stream.concat(indexColumnNames, primaryKeyColumnNames)
+                .distinct() // remove Primary Key columns if they are already present in the index definition
+                .map(columnName -> {
+                    ColumnView columnView = tableConfig.columns().get(columnName);
+
+                    assert columnView != null : "Incorrect index column configuration. " + columnName + " column does not exist";
+
+                    return columnView;
+                })
+                .collect(toList());
+
+        schemaDescriptor = createSchemaDescriptor(indexKeyColumnViews);
+
+        columns = Arrays.stream(schemaDescriptor.keyColumns().columns())
+                .sorted(Comparator.comparingInt(Column::columnOrder))
+                .map(column -> {
+                    IndexColumnView columnView = indexColumns.get(column.name());
+
+                    // if the index config does not contain this column - it's a column from the Primary Key
+                    boolean indexedColumn = columnView != null;
+
+                    // PK columns are always sorted in ascending order
+                    boolean asc = !indexedColumn || columnView.asc();
+
+                    return new ColumnDescriptor(column, asc, indexedColumn, column.nullable());
+                })
+                .collect(toUnmodifiableList());
+    }
+
+    /**
+     * Creates a {@link SchemaDescriptor} from a list of index key columns.
+     */
+    private static SchemaDescriptor createSchemaDescriptor(List<ColumnView> indexKeyColumnViews) {
+        Column[] keyColumns = new Column[indexKeyColumnViews.size()];
+
+        for (int i = 0; i < indexKeyColumnViews.size(); ++i) {
+            ColumnView columnView = indexKeyColumnViews.get(i);
+
+            ColumnDefinition columnDefinition = SchemaConfigurationConverter.convert(columnView);
+
+            keyColumns[i] = SchemaDescriptorConverter.convert(i, columnDefinition);
+        }
+
+        return new SchemaDescriptor(0, keyColumns, new Column[0]);
+    }
+
+    /**
+     * Returns this index' name.
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the Column Descriptors that comprise a row of this index (indexed columns + primary key columns).
+     */
+    public List<ColumnDescriptor> indexRowColumns() {
+        return columns;
+    }
+
+    /**
+     * Converts this Descriptor into an equivalent {@link SchemaDescriptor}.
+     *
+     * <p>The resulting {@code SchemaDescriptor} will have empty {@link SchemaDescriptor#valueColumns()} and its

Review comment:
       <p> should have a closing tag

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+
+/**
+ * Represents an Index Row - a set of indexed columns and Primary Key columns (for key uniqueness).
+ */
+public interface IndexRow {
+    /**
+     * Returns the serialized presentation of this row as a byte array.
+     *
+     * @return Serialized byte array value.
+     */
+    byte[] rowBytes();
+
+    /**
+     * Returns the Primary Key that is a part of this row.
+     *
+     * <p>This is a convenience method for easier extraction of the Primary Key to use it for accessing the {@link PartitionStorage}.

Review comment:
       <p> should have a closing tag

##########
File path: modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.schema.SchemaTestUtils.generateRandomValue;
+import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.schema.SchemaBuilders.column;
+import static org.apache.ignite.schema.SchemaBuilders.tableBuilder;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowPrefix;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.testframework.VariableSource;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.definition.ColumnDefinition;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder;
+import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder.SortedIndexColumnBuilder;
+import org.apache.ignite.schema.definition.index.ColumnarIndexDefinition;
+import org.apache.ignite.schema.definition.index.HashIndexDefinition;
+import org.apache.ignite.schema.definition.index.SortedIndexDefinition;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+
+/**
+ * Test class for the {@link RocksDbSortedIndexStorage}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class RocksDbSortedIndexStorageTest {
+    private static final IgniteLogger log = IgniteLogger.forClass(RocksDbSortedIndexStorageTest.class);
+
+    private static final Random random = new Random();

Review comment:
       I think random should be seeded, so we can have repeatable tests. Also, I'm not sure that @RepeatedTest annotation will provide any information about the definitions. Maybe these tests should be parameterized too?

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowDeserializer;
+import org.apache.ignite.internal.storage.index.IndexRowFactory;
+import org.apache.ignite.internal.storage.index.IndexRowPrefix;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * {@link SortedIndexStorage} implementation based on RocksDB.
+ */
+public class RocksDbSortedIndexStorage implements SortedIndexStorage {
+    private final ColumnFamily indexCf;
+
+    private final SortedIndexDescriptor descriptor;
+
+    private final IndexRowFactory indexRowFactory;
+
+    private final IndexRowDeserializer indexRowDeserializer;
+
+    /**
+     * Creates a new Index storage.
+     *
+     * @param indexCf Column Family for storing the data.
+     * @param descriptor Index descriptor.
+     */
+    public RocksDbSortedIndexStorage(ColumnFamily indexCf, SortedIndexDescriptor descriptor) {
+        this.indexCf = indexCf;
+        this.descriptor = descriptor;
+        this.indexRowFactory = new BinaryIndexRowFactory(descriptor);
+        this.indexRowDeserializer = new BinaryIndexRowDeserializer(descriptor);
+    }
+
+    @Override
+    public SortedIndexDescriptor indexDescriptor() {
+        return descriptor;
+    }
+
+    @Override
+    public IndexRowFactory indexRowFactory() {
+        return indexRowFactory;
+    }
+
+    @Override
+    public IndexRowDeserializer indexRowDeserializer() {
+        return indexRowDeserializer;
+    }
+
+    @Override
+    public void put(IndexRow row) {
+        assert row.rowBytes().length > 0;
+        assert row.primaryKey().keyBytes().length > 0;
+
+        try {
+            indexCf.put(row.rowBytes(), row.primaryKey().keyBytes());
+        } catch (RocksDBException e) {
+            throw new StorageException("Error while adding data to Rocks DB", e);
+        }
+    }
+
+    @Override
+    public void remove(IndexRow key) {
+        try {
+            indexCf.delete(key.rowBytes());
+        } catch (RocksDBException e) {
+            throw new StorageException("Error while removing data from Rocks DB", e);
+        }
+    }
+
+    @Override
+    public Cursor<IndexRow> range(IndexRowPrefix lowerBound, IndexRowPrefix upperBound) {
+        RocksIterator iter = indexCf.newIterator();
+
+        iter.seekToFirst();
+
+        return new RocksIteratorAdapter<>(iter) {
+            @Nullable
+            private PrefixComparator lowerBoundComparator = new PrefixComparator(descriptor, lowerBound);
+
+            private final PrefixComparator upperBoundComparator = new PrefixComparator(descriptor, upperBound);
+
+            @Override
+            public boolean hasNext() {
+                while (super.hasNext()) {
+                    var row = new ByteBufferRow(it.key());
+
+                    if (lowerBoundComparator != null) {
+                        // if lower comparator is not null, then the lower bound has not yet been reached
+                        if (lowerBoundComparator.compare(row) < 0) {
+                            it.next();
+                        } else {
+                            // once the lower bound is reached, we no longer need to check it
+                            lowerBoundComparator = null;
+
+                            return upperBoundComparator.compare(row) <= 0;
+                        }
+                    } else {
+                        return upperBoundComparator.compare(row) <= 0;
+                    }

Review comment:
       ```suggestion
           if (lowerBoundComparator != null) {
               // if lower comparator is not null, then the lower bound has not yet been reached
               if (lowerBoundComparator.compare(row) < 0) {
                   it.next();
                   continue;
               } else {
                   // once the lower bound is reached, we no longer need to check it
                   lowerBoundComparator = null;
               }
           }
           return upperBoundComparator.compare(row) <= 0;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org