You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "ibessonov (via GitHub)" <gi...@apache.org> on 2023/06/15 12:50:44 UTC

[GitHub] [ignite-3] ibessonov opened a new pull request, #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

ibessonov opened a new pull request, #2200:
URL: https://github.com/apache/ignite-3/pull/2200

   https://issues.apache.org/jira/browse/IGNITE-19591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1231976667


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -803,30 +802,25 @@ void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
     }
 
     /**
-     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read and that it's returned from
-     * {@link MvPartitionStorage#persistedIndex()} after the {@link MvPartitionStorage#flush()}.
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read back.
      */
     @Test
     void testAppliedIndex() {
         storage.runConsistently(locker -> {
             assertEquals(0, storage.lastAppliedIndex());
             assertEquals(0, storage.lastAppliedTerm());
-            assertEquals(0, storage.persistedIndex());

Review Comment:
   Are you sure that you need to remove these checks, you have deprecated this method for now, let it be tested for now.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -803,30 +802,25 @@ void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
     }
 
     /**
-     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read and that it's returned from
-     * {@link MvPartitionStorage#persistedIndex()} after the {@link MvPartitionStorage#flush()}.
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read back.
      */
     @Test
     void testAppliedIndex() {
         storage.runConsistently(locker -> {
             assertEquals(0, storage.lastAppliedIndex());
             assertEquals(0, storage.lastAppliedTerm());
-            assertEquals(0, storage.persistedIndex());
 
             storage.lastApplied(1, 1);
 
             assertEquals(1, storage.lastAppliedIndex());
             assertEquals(1, storage.lastAppliedTerm());
-            assertThat(storage.persistedIndex(), is(lessThanOrEqualTo(1L)));
 
             return null;
         });
 
         CompletableFuture<Void> flushFuture = storage.flush();
 
         assertThat(flushFuture, willCompleteSuccessfully());
-
-        assertEquals(1, storage.persistedIndex());

Review Comment:
   Same



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java:
##########
@@ -521,7 +521,6 @@ protected static void checkLastApplied(
             long expLastAppliedTerm
     ) {
         assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
-        assertEquals(expPersistentIndex, storage.persistedIndex());

Review Comment:
   same



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -1054,7 +1053,6 @@ private static void checkLastApplied(
             long expLastAppliedTerm
     ) {
         assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
-        assertEquals(expPersistentIndex, storage.persistedIndex());

Review Comment:
   Same



##########
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java:
##########
@@ -151,7 +150,9 @@ void testRestart() {
 
         tableStorage.start();
 
-        assertThat(tableStorage.getMvPartition(PARTITION_ID), is(notNullValue()));
+        assertThat(tableStorage.getMvPartition(PARTITION_ID), is(nullValue()));
+
+        tableStorage.createMvPartition(PARTITION_ID);

Review Comment:
   The method returns the future, please check if it succeeds.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -34,7 +34,7 @@
  * Snapshot storage factory for {@link MvPartitionStorage}. Utilizes the fact that every partition already stores its latest applied index
  * and thus can itself be used as its own snapshot.
  *
- * <p>Uses {@link MvPartitionStorage#persistedIndex()} and configuration, passed into constructor, to create a {@link SnapshotMeta} object

Review Comment:
   Same



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -70,6 +69,8 @@ public class RocksDbStorageEngine implements StorageEngine {
 
     private final Map<String, RocksDbDataRegion> regions = new ConcurrentHashMap<>();
 
+    private final Map<String, SharedRocksDbInstance> sharedInstances = new ConcurrentHashMap<>();

Review Comment:
   What is the key? name or something else? it would be more convenient to reflect in the variable name or documentation.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -170,14 +173,18 @@ public RocksDbTableStorage createMvTable(
 
         assert dataRegion != null : "tableId=" + tableId + ", dataRegion=" + tableDescriptor.getDataRegion();
 
-        Path tablePath = storagePath.resolve(TABLE_DIR_PREFIX + tableId);
-
-        try {
-            Files.createDirectories(tablePath);
-        } catch (IOException e) {
-            throw new StorageException("Failed to create table store directory for table: " + tableId, e);
-        }
+        SharedRocksDbInstance sharedInstance = sharedInstances.computeIfAbsent(tableDescriptor.getDataRegion(), name -> {

Review Comment:
   Do I understand correctly that now for tables with a rocsDB it will not be so easy to change the data region, maybe you need to tell about it somewhere in the documentation?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+

Review Comment:
   ```suggestion
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);

Review Comment:
   Perhaps we need to create a new set to avoid data races.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java:
##########
@@ -38,6 +38,9 @@ public class RocksDbStorageUtils {
     /** Index ID size in bytes. */
     public static final int INDEX_ID_SIZE = Integer.SIZE;
 
+    /** Table ID size. */

Review Comment:
   ```suggestion
       /** Table ID size in bytes. */
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -127,7 +126,11 @@ interface Locker {
 
     /**
      * {@link #lastAppliedIndex()} value consistent with the data, already persisted on the storage.
+     *
+     * @deprecated No one needs it, and it slows down the storage.
      */
+    //TODO IGNITE-19750 Delete this method.

Review Comment:
   ```suggestion
       // TODO: IGNITE-19750 Delete this method.
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java:
##########
@@ -145,7 +146,6 @@ public interface TxStateStorage extends ManuallyCloseable {
      *         <li>{@link TxStateStorage#put(UUID, TxMeta)};</li>
      *         <li>{@link TxStateStorage#lastAppliedIndex()};</li>
      *         <li>{@link TxStateStorage#lastAppliedTerm()}} ()};</li>
-     *         <li>{@link TxStateStorage#persistedIndex()}};</li>

Review Comment:
   same



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -803,30 +802,25 @@ void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
     }
 
     /**
-     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read and that it's returned from
-     * {@link MvPartitionStorage#persistedIndex()} after the {@link MvPartitionStorage#flush()}.
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read back.
      */
     @Test
     void testAppliedIndex() {
         storage.runConsistently(locker -> {
             assertEquals(0, storage.lastAppliedIndex());
             assertEquals(0, storage.lastAppliedTerm());
-            assertEquals(0, storage.persistedIndex());
 
             storage.lastApplied(1, 1);
 
             assertEquals(1, storage.lastAppliedIndex());
             assertEquals(1, storage.lastAppliedTerm());
-            assertThat(storage.persistedIndex(), is(lessThanOrEqualTo(1L)));

Review Comment:
   Same



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2372,8 +2372,8 @@ private CompletableFuture<PartitionStorages> getOrCreatePartitionStorages(TableI
                 .thenComposeAsync(mvPartitionStorage -> {
                     TxStateStorage txStateStorage = internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
 
-                    if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS

Review Comment:
   same



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -673,10 +673,10 @@ private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean isTxStorage
 
         if (isTxStorageUnderRebalance) {
             // Emulate a situation when TX state storage was stopped in a middle of rebalance.
-            when(txStateStorage.persistedIndex()).thenReturn(TxStateStorage.REBALANCE_IN_PROGRESS);

Review Comment:
   same



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java:
##########
@@ -201,7 +201,7 @@ public interface TxStateStorage extends ManuallyCloseable {
      *     <li>Cancels all current operations (including cursors) with storage and waits for their completion;</li>
      *     <li>Does not allow operations to be performed (exceptions will be thrown) with the storage until the cleaning is completed;</li>
      *     <li>Clears storage;</li>
-     *     <li>Sets the {@link #lastAppliedIndex()}, {@link #lastAppliedTerm()} and {@link #persistedIndex()} to {@code 0};</li>

Review Comment:
   same



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2372,8 +2372,8 @@ private CompletableFuture<PartitionStorages> getOrCreatePartitionStorages(TableI
                 .thenComposeAsync(mvPartitionStorage -> {
                     TxStateStorage txStateStorage = internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
 
-                    if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS
-                            || txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {

Review Comment:
   same



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -67,24 +93,135 @@ static ColumnFamilyType fromCfName(String cfName) {
     }
 
     /**
-     * Creates a column family name by index ID.
-     *
-     * @param indexId Index ID.
+     * Generates a sorted index column family name by its columns descriptions.
+     * The resulting array has a {@link #SORTED_INDEX_CF_PREFIX} prefix as a UTF8 array, followed by a number of pairs
+     * {@code {type, flags}}, where type represents ordinal of the corresponding {@link NativeTypeSpec}, and
+     * flags store information about column's nullability and comparison order.
      *
-     * @see #sortedIndexId
+     * @see #comparatorFromCfName(byte[])
      */
-    static String sortedIndexCfName(int indexId) {
-        return SORTED_INDEX_CF_PREFIX + indexId;
+    static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) {

Review Comment:
   Please add unit tests for this



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageSortedIndexDescriptor.java:
##########
@@ -64,6 +64,7 @@ public StorageSortedIndexColumnDescriptor(String name, NativeType type, boolean
         }
 
         @Override
+        @Deprecated

Review Comment:
   Maybe add TODO or something like that?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -67,24 +93,135 @@ static ColumnFamilyType fromCfName(String cfName) {
     }
 
     /**
-     * Creates a column family name by index ID.
-     *
-     * @param indexId Index ID.
+     * Generates a sorted index column family name by its columns descriptions.
+     * The resulting array has a {@link #SORTED_INDEX_CF_PREFIX} prefix as a UTF8 array, followed by a number of pairs
+     * {@code {type, flags}}, where type represents ordinal of the corresponding {@link NativeTypeSpec}, and
+     * flags store information about column's nullability and comparison order.
      *
-     * @see #sortedIndexId
+     * @see #comparatorFromCfName(byte[])
      */
-    static String sortedIndexCfName(int indexId) {
-        return SORTED_INDEX_CF_PREFIX + indexId;
+    static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) {
+        ByteBuffer buf = ByteBuffer.allocate(SORTED_INDEX_CF_PREFIX.length() + columns.size() * 2);
+
+        buf.put(SORTED_INDEX_CF_PREFIX.getBytes(UTF_8));
+
+        for (StorageSortedIndexColumnDescriptor column : columns) {
+            NativeType nativeType = column.type();
+            NativeTypeSpec nativeTypeSpec = nativeType.spec();
+
+            buf.put((byte) nativeTypeSpec.ordinal());
+
+            int flags = 0;
+
+            if (column.nullable()) {
+                flags |= NULLABILITY_FLAG;
+            }
+
+            if (column.asc()) {
+                flags |= ASC_ORDER_FLAG;
+            }
+
+            buf.put((byte) flags);
+        }
+
+        return buf.array();
     }
 
     /**
-     * Extracts a Sorted Index ID from the given Column Family name.
-     *
-     * @param cfName Column Family name.
-     *
-     * @see #sortedIndexCfName
+     * Creates an {@link org.rocksdb.AbstractComparator} instance to compare keys in column family with name {@code cfName}.
+     * Please refer to {@link #sortedIndexCfName(List)} for the details of the CF name encoding.
      */
-    static int sortedIndexId(String cfName) {
-        return Integer.parseInt(cfName.substring(SORTED_INDEX_CF_PREFIX.length()));
+    public static RocksDbBinaryTupleComparator comparatorFromCfName(byte[] cfName) {

Review Comment:
   It would be nice to do unit tests for this method.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -673,10 +673,10 @@ private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean isTxStorage
 
         if (isTxStorageUnderRebalance) {
             // Emulate a situation when TX state storage was stopped in a middle of rebalance.
-            when(txStateStorage.persistedIndex()).thenReturn(TxStateStorage.REBALANCE_IN_PROGRESS);
+            when(txStateStorage.lastAppliedIndex()).thenReturn(TxStateStorage.REBALANCE_IN_PROGRESS);
         } else {
             // Emulate a situation when partition storage was stopped in a middle of rebalance.
-            when(mvPartitionStorage.persistedIndex()).thenReturn(MvPartitionStorage.REBALANCE_IN_PROGRESS);

Review Comment:
   same



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for meta column family.. */

Review Comment:
   ```suggestion
       /** Meta information instance that wraps {@link ColumnFamily} instance for meta column family. */
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java:
##########
@@ -17,110 +17,57 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.ByteOrder.BIG_ENDIAN;
 import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
-import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PARTITION_ID_SIZE;
 import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
 import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.getRowIdUuid;
-import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.putIndexId;
 import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.putRowIdUuid;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 
 import java.nio.ByteBuffer;
-import java.util.stream.Stream;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.AbstractWriteBatch;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Slice;
 
 /**
  * Wrapper around the "meta" Column Family inside a RocksDB-based storage, which stores some auxiliary information needed for internal
  * storage logic.
  */
 public class RocksDbMetaStorage {
-    /** Name of the key that corresponds to a list of existing partition IDs of a storage. */
-    private static final byte[] PARTITION_ID_PREFIX = "part".getBytes(UTF_8);
-
-    /** Index meta key prefix. */
-    private static final byte[] INDEX_META_KEY_PREFIX = "index-meta".getBytes(UTF_8);
-
-    /** Index meta key size in bytes. */
-    private static final int INDEX_META_KEY_SIZE = INDEX_META_KEY_PREFIX.length + PARTITION_ID_SIZE + ROW_ID_SIZE;
-
-    /** Name of the key that is out of range of the partition ID key prefix, used as an exclusive bound. */
-    private static final byte[] PARTITION_ID_PREFIX_END = RocksUtils.incrementPrefix(PARTITION_ID_PREFIX);
+    public static final byte[] PARTITION_META_PREFIX = {0};

Review Comment:
   Missing javadoc for constants



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);
+
+                return indexIds;
+            });
+
+            return result[0];
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Possibly drops the column family after destroying the index.
+     */
+    public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                if (indexIds == null) {
+                    return null;
+                }
+
+                indexIds.remove(indexId);
+
+                if (indexIds.isEmpty()) {

Review Comment:
   Perhaps we need to create a new set to avoid data races.
   
   



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);
+
+                return indexIds;
+            });
+
+            return result[0];
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Possibly drops the column family after destroying the index.
+     */
+    public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                if (indexIds == null) {
+                    return null;
+                }
+
+                indexIds.remove(indexId);
+
+                if (indexIds.isEmpty()) {
+                    indexIds = null;
+
+                    destroyColumnFamily(name);
+                }
+
+                return indexIds;
+            });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private ColumnFamily getOrCreateColumnFamily(byte[] cfName, ByteArray name) {
+        return sortedIndexCfs.computeIfAbsent(name, unused -> {
+            ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
+
+            ColumnFamily columnFamily;
+            try {
+                columnFamily = ColumnFamily.create(db, cfDescriptor);
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to create new RocksDB column family: " + new String(cfDescriptor.getName(), UTF_8), e);

Review Comment:
   Maybe create a helper common method to get the name from **ColumnFamilyDescriptor**?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232105159


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -803,30 +802,25 @@ void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
     }
 
     /**
-     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read and that it's returned from
-     * {@link MvPartitionStorage#persistedIndex()} after the {@link MvPartitionStorage#flush()}.
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read back.
      */
     @Test
     void testAppliedIndex() {
         storage.runConsistently(locker -> {
             assertEquals(0, storage.lastAppliedIndex());
             assertEquals(0, storage.lastAppliedTerm());
-            assertEquals(0, storage.persistedIndex());

Review Comment:
   I removed the method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov merged pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov merged PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232109927


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -67,24 +93,135 @@ static ColumnFamilyType fromCfName(String cfName) {
     }
 
     /**
-     * Creates a column family name by index ID.
-     *
-     * @param indexId Index ID.
+     * Generates a sorted index column family name by its columns descriptions.
+     * The resulting array has a {@link #SORTED_INDEX_CF_PREFIX} prefix as a UTF8 array, followed by a number of pairs
+     * {@code {type, flags}}, where type represents ordinal of the corresponding {@link NativeTypeSpec}, and
+     * flags store information about column's nullability and comparison order.
      *
-     * @see #sortedIndexId
+     * @see #comparatorFromCfName(byte[])
      */
-    static String sortedIndexCfName(int indexId) {
-        return SORTED_INDEX_CF_PREFIX + indexId;
+    static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) {

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232175439


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -239,7 +240,9 @@ private void destroyTableData() {
 
             rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
         } catch (RocksDBException e) {
-            //TODO Handle.
+            String message = format("Failed to destroy table data. [tableId={}]", getTableId());

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232103992


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -239,7 +240,9 @@ private void destroyTableData() {
 
             rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
         } catch (RocksDBException e) {
-            //TODO Handle.
+            String message = format("Failed to destroy table data. [tableId={}]", getTableId());

Review Comment:
   U can use `org.apache.ignite.internal.storage.StorageException#StorageException(int, java.lang.String, java.lang.Throwable, java.lang.Object...)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232166136


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -170,14 +173,18 @@ public RocksDbTableStorage createMvTable(
 
         assert dataRegion != null : "tableId=" + tableId + ", dataRegion=" + tableDescriptor.getDataRegion();
 
-        Path tablePath = storagePath.resolve(TABLE_DIR_PREFIX + tableId);
-
-        try {
-            Files.createDirectories(tablePath);
-        } catch (IOException e) {
-            throw new StorageException("Failed to create table store directory for table: " + tableId, e);
-        }
+        SharedRocksDbInstance sharedInstance = sharedInstances.computeIfAbsent(tableDescriptor.getDataRegion(), name -> {

Review Comment:
   We should revisit this whole data region thing in rocksdb anyway, I think I'll create a JIRA for it
   EDIT: https://issues.apache.org/jira/browse/IGNITE-19762
   We'll figure it out later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232132258


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -67,24 +93,135 @@ static ColumnFamilyType fromCfName(String cfName) {
     }
 
     /**
-     * Creates a column family name by index ID.
-     *
-     * @param indexId Index ID.
+     * Generates a sorted index column family name by its columns descriptions.
+     * The resulting array has a {@link #SORTED_INDEX_CF_PREFIX} prefix as a UTF8 array, followed by a number of pairs
+     * {@code {type, flags}}, where type represents ordinal of the corresponding {@link NativeTypeSpec}, and
+     * flags store information about column's nullability and comparison order.
      *
-     * @see #sortedIndexId
+     * @see #comparatorFromCfName(byte[])
      */
-    static String sortedIndexCfName(int indexId) {
-        return SORTED_INDEX_CF_PREFIX + indexId;
+    static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) {
+        ByteBuffer buf = ByteBuffer.allocate(SORTED_INDEX_CF_PREFIX.length() + columns.size() * 2);
+
+        buf.put(SORTED_INDEX_CF_PREFIX.getBytes(UTF_8));
+
+        for (StorageSortedIndexColumnDescriptor column : columns) {
+            NativeType nativeType = column.type();
+            NativeTypeSpec nativeTypeSpec = nativeType.spec();
+
+            buf.put((byte) nativeTypeSpec.ordinal());
+
+            int flags = 0;
+
+            if (column.nullable()) {
+                flags |= NULLABILITY_FLAG;
+            }
+
+            if (column.asc()) {
+                flags |= ASC_ORDER_FLAG;
+            }
+
+            buf.put((byte) flags);
+        }
+
+        return buf.array();
     }
 
     /**
-     * Extracts a Sorted Index ID from the given Column Family name.
-     *
-     * @param cfName Column Family name.
-     *
-     * @see #sortedIndexCfName
+     * Creates an {@link org.rocksdb.AbstractComparator} instance to compare keys in column family with name {@code cfName}.
+     * Please refer to {@link #sortedIndexCfName(List)} for the details of the CF name encoding.
      */
-    static int sortedIndexId(String cfName) {
-        return Integer.parseInt(cfName.substring(SORTED_INDEX_CF_PREFIX.length()));
+    public static RocksDbBinaryTupleComparator comparatorFromCfName(byte[] cfName) {

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232175078


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);
+
+                return indexIds;
+            });
+
+            return result[0];
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Possibly drops the column family after destroying the index.
+     */
+    public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                if (indexIds == null) {
+                    return null;
+                }
+
+                indexIds.remove(indexId);
+
+                if (indexIds.isEmpty()) {
+                    indexIds = null;
+
+                    destroyColumnFamily(name);
+                }
+
+                return indexIds;
+            });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private ColumnFamily getOrCreateColumnFamily(byte[] cfName, ByteArray name) {
+        return sortedIndexCfs.computeIfAbsent(name, unused -> {
+            ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
+
+            ColumnFamily columnFamily;
+            try {
+                columnFamily = ColumnFamily.create(db, cfDescriptor);
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to create new RocksDB column family: " + new String(cfDescriptor.getName(), UTF_8), e);

Review Comment:
   I'll do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232166136


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -170,14 +173,18 @@ public RocksDbTableStorage createMvTable(
 
         assert dataRegion != null : "tableId=" + tableId + ", dataRegion=" + tableDescriptor.getDataRegion();
 
-        Path tablePath = storagePath.resolve(TABLE_DIR_PREFIX + tableId);
-
-        try {
-            Files.createDirectories(tablePath);
-        } catch (IOException e) {
-            throw new StorageException("Failed to create table store directory for table: " + tableId, e);
-        }
+        SharedRocksDbInstance sharedInstance = sharedInstances.computeIfAbsent(tableDescriptor.getDataRegion(), name -> {

Review Comment:
   We should revisit this whole data region thing in rocksdb anyway, I think I'll create a JIRA for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232171093


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Shared RocksDB instance for multiple tables. Managed directly by the engine.
+ */
+public final class SharedRocksDbInstance {
+
+    /** Write options. */
+    public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
+
+    /** RocksDB storage engine instance. */
+    public final RocksDbStorageEngine engine;
+
+    /** Path for the directory that stores the data. */
+    public final Path path;
+
+    /** RocksDB flusher instance. */
+    public final RocksDbFlusher flusher;
+
+    /** Rocks DB instance. */
+    public final RocksDB db;
+
+    /** Meta information instance that wraps {@link ColumnFamily} instance for meta column family.. */
+    public final RocksDbMetaStorage meta;
+
+    /** Column Family for partition data. */
+    public final ColumnFamily partitionCf;
+
+    /** Column Family for GC queue. */
+    public final ColumnFamily gcQueueCf;
+
+    /** Column Family for Hash Index data. */
+    public final ColumnFamily hashIndexCf;
+
+    /** Column Family instances for different types of sorted indexes, identified by the column family name. */
+    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
+
+    /** Column family names mapped to sets of index IDs, that use that CF. */
+    private final ConcurrentMap<ByteArray, Set<Integer>> sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    SharedRocksDbInstance(
+            RocksDbStorageEngine engine,
+            Path path,
+            IgniteSpinBusyLock busyLock,
+            RocksDbFlusher flusher,
+            RocksDB db,
+            RocksDbMetaStorage meta,
+            ColumnFamily partitionCf,
+            ColumnFamily gcQueueCf,
+            ColumnFamily hashIndexCf,
+            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+    ) {
+        this.engine = engine;
+        this.path = path;
+        this.busyLock = busyLock;
+
+        this.flusher = flusher;
+        this.db = db;
+
+        this.meta = meta;
+        this.partitionCf = partitionCf;
+        this.gcQueueCf = gcQueueCf;
+        this.hashIndexCf = hashIndexCf;
+        this.sortedIndexCfs = sortedIndexCfs;
+    }
+
+    /**
+     * Utility method that performs range-deletion in the column family.
+     */
+    public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily columnFamily, byte[] prefix) throws RocksDBException {
+        byte[] upperBound = incrementPrefix(prefix);
+
+        writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
+    }
+
+    /**
+     * Stops the instance, freeing all allocated resources.
+     */
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> resources = new ArrayList<>();
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
+        resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndexCfs.values().stream()
+                .map(ColumnFamily::handle)
+                .collect(toList())
+        );
+
+        resources.add(db);
+        resources.add(flusher::stop);
+
+        try {
+            Collections.reverse(resources);
+
+            IgniteUtils.closeAll(resources);
+        } catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB storage: " + path, e);
+        }
+    }
+
+    /**
+     * Returns Column Family instance with the desired name. Creates it it it doesn't exist.
+     * Tracks every created index by its {@code indexId}.
+     */
+    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            ColumnFamily[] result = {null};
+
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, name);
+
+                result[0] = columnFamily;
+
+                if (indexIds == null) {
+                    indexIds = new HashSet<>();
+                }
+
+                indexIds.add(indexId);
+
+                return indexIds;
+            });
+
+            return result[0];
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Possibly drops the column family after destroying the index.
+     */
+    public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> {
+                if (indexIds == null) {
+                    return null;
+                }
+
+                indexIds.remove(indexId);
+
+                if (indexIds.isEmpty()) {

Review Comment:
   But we are protected by the lock in "compute" from "sortedIndexIdsByCfName", there are no races already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2200: IGNITE-19591 Implemented usage of shared RocksDB instances for multiple tables.

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2200:
URL: https://github.com/apache/ignite-3/pull/2200#discussion_r1232189383


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -92,6 +90,10 @@ public static ColumnFamilyType fromCfName(String cfName) {
         }
     }
 
+    public static String stringName(byte[] cfName) {

Review Comment:
   Missing javadoc, I think a more appropriate name `toStingName`



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java:
##########
@@ -189,7 +188,7 @@ private List<ColumnFamilyDescriptor> getExistingCfDescriptors(Path path) throws
 
     @SuppressWarnings("resource")
     private ColumnFamilyOptions createCfOptions(byte[] cfName, Path path) {
-        var utf8cfName = new String(cfName, UTF_8);
+        String utf8cfName = ColumnFamilyUtils.stringName(cfName);

Review Comment:
   Use static import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org