You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/29 10:47:02 UTC

[GitHub] [ignite-3] sashapolo commented on a change in pull request #365: IGNITE-15351 Implemented concepts of storage engines and data regions with basic integration into existing code.

sashapolo commented on a change in pull request #365:
URL: https://github.com/apache/ignite-3/pull/365#discussion_r717711965



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -467,27 +470,36 @@ public static void shutdownAndAwaitTermination(ExecutorService service, long tim
      * thrown exception will be propagated to the caller, after all other objects are closed, similar to
      * the try-with-resources block.
      *
-     * @param closeables Collection of objects to close.
+     * @param closeables Stream of objects to close.
      * @throws Exception If failed to close.
      */
-    public static void closeAll(Collection<? extends AutoCloseable> closeables) throws Exception {
-        Exception ex = null;
+    public static void closeAll(Stream<? extends AutoCloseable> closeables) throws Exception {

Review comment:
       wow, are these changes really necessary?

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -125,14 +143,53 @@
 
     /** {@inheritDoc} */
     @Override public Path getListenerPersistencePath(PartitionListener listener) {
-        return ((RocksDbStorage) listener.getStorage()).getDbPath();
+        return paths.get(listener);
     }
 
     /** {@inheritDoc} */
     @Override public RaftGroupListener createListener(Path workDir) {
-        return new PartitionListener(
-            new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)
-        );
+        Path tableDir = workDir.resolve(UUID.randomUUID().toString());
+
+        PartitionListener listener = new PartitionListener(new ConcurrentHashMapStorage() {
+            /** {@inheritDoc} */
+            @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+                return CompletableFuture.runAsync(() -> {
+                    try (
+                        OutputStream out = new FileOutputStream(snapshotPath.resolve("snapshot_file").toFile());

Review comment:
       you should use `Files.newOutputStream(snapshotPath.resolve("snapshot_file"))` instead

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -125,14 +143,53 @@
 
     /** {@inheritDoc} */
     @Override public Path getListenerPersistencePath(PartitionListener listener) {
-        return ((RocksDbStorage) listener.getStorage()).getDbPath();
+        return paths.get(listener);
     }
 
     /** {@inheritDoc} */
     @Override public RaftGroupListener createListener(Path workDir) {
-        return new PartitionListener(
-            new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)
-        );
+        Path tableDir = workDir.resolve(UUID.randomUUID().toString());
+
+        PartitionListener listener = new PartitionListener(new ConcurrentHashMapStorage() {
+            /** {@inheritDoc} */
+            @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+                return CompletableFuture.runAsync(() -> {
+                    try (
+                        OutputStream out = new FileOutputStream(snapshotPath.resolve("snapshot_file").toFile());
+                        ObjectOutputStream objOut = new ObjectOutputStream(out)
+                    ) {
+                        objOut.writeObject(map.keySet().stream().map(ByteArray::bytes).collect(toList()));
+                        objOut.writeObject(map.values().stream().collect(toList()));

Review comment:
       can be replaced with `new ArrayList<>(map.values())`

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -63,6 +78,9 @@
     /** */
     private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
 
+    /** Paths for created partition listeners. */
+    private Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();

Review comment:
       ```suggestion
       private final Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();
   ```

##########
File path: modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Configuration schema for data region. Currently it represents configuration for rocksdb storage engine only.
+ */
+@Config
+public class DataRegionConfigurationSchema {
+    /** Type for the future polymorphic configuration schemas. */
+    @OneOf("rocksdb")
+    @Value(hasDefault = true)
+    public String type = "rocksdb";
+
+    /** Size of the rocksdb offheap cache. */
+    @Value(hasDefault = true)
+    public int size = 64 * 1024 * 1024;
+
+    /** Size of rocksdb write buffer. */
+    @Value(hasDefault = true)
+    @Min(1)
+    public long writeBufferSize = 64 * 1024 * 1024;
+
+    /** Cache type - either {@code Clock} or {@code LRU}. */
+    @OneOf({"Clock", "LRU"})

Review comment:
       RocksDB documentation states that the Clock implementation has bugs and is not production-ready, maybe we shouldn't include it

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -122,27 +133,39 @@
     /** Tables. */
     private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
 
+    /** Default data region instance. */
+    private DataRegion defaultDataRegion;
+
+    //TODO: IGNITE-15161 These should go into TableImpl instances.
+    /** Instances of table storages that need to be stop on component stop. */
+    private final Set<TableStorage> tableStorages = ConcurrentHashMap.newKeySet();

Review comment:
       wow, I didn't know about the `newKeySet` method, that's awesome

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.util.Locale;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataRegionView;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.rocksdb.Cache;
+import org.rocksdb.ClockCache;
+import org.rocksdb.LRUCache;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteBufferManager;
+
+/**
+ * Data region implementation for {@link RocksDbStorageEngine}. Based on a {@link Cache}.
+ */
+public class RocksDbDataRegion implements DataRegion {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** Region configuration. */
+    private final DataRegionConfiguration cfg;
+
+    /** RocksDB cache instance. */
+    private Cache cache;
+
+    /** Write buffer manager instance. */
+    private WriteBufferManager writeBufferManager;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Data region configuration.
+     */
+    public RocksDbDataRegion(DataRegionConfiguration cfg) {
+        this.cfg = cfg;
+
+        assert "rocksdb".equalsIgnoreCase(cfg.type().value());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        DataRegionView dataRegionView = cfg.value();
+
+        long writeBufferSize = dataRegionView.writeBufferSize();
+
+        long totalCacheSize = dataRegionView.size() + writeBufferSize;
+
+        switch (dataRegionView.cache().toLowerCase(Locale.ROOT)) {
+            case "clock":
+                cache = new ClockCache(totalCacheSize, dataRegionView.numShardBits(), false);
+
+                break;
+
+            case "lru":
+                cache = new LRUCache(totalCacheSize, dataRegionView.numShardBits(), false);
+
+                break;
+
+            default:
+                assert false : dataRegionView.cache();
+        }
+
+        writeBufferManager = new WriteBufferManager(writeBufferSize, cache);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        if (writeBufferManager != null)

Review comment:
       please use `closeAll` instead

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -125,14 +143,53 @@
 
     /** {@inheritDoc} */
     @Override public Path getListenerPersistencePath(PartitionListener listener) {
-        return ((RocksDbStorage) listener.getStorage()).getDbPath();
+        return paths.get(listener);
     }
 
     /** {@inheritDoc} */
     @Override public RaftGroupListener createListener(Path workDir) {
-        return new PartitionListener(
-            new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)
-        );
+        Path tableDir = workDir.resolve(UUID.randomUUID().toString());
+
+        PartitionListener listener = new PartitionListener(new ConcurrentHashMapStorage() {
+            /** {@inheritDoc} */
+            @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+                return CompletableFuture.runAsync(() -> {
+                    try (
+                        OutputStream out = new FileOutputStream(snapshotPath.resolve("snapshot_file").toFile());
+                        ObjectOutputStream objOut = new ObjectOutputStream(out)
+                    ) {
+                        objOut.writeObject(map.keySet().stream().map(ByteArray::bytes).collect(toList()));
+                        objOut.writeObject(map.values().stream().collect(toList()));
+                    }
+                    catch (Exception e) {
+                        throw new IgniteInternalException(e);
+                    }
+                });
+            }
+
+            /** {@inheritDoc} */
+            @Override public void restoreSnapshot(Path snapshotPath) {
+                try (
+                    InputStream in = new FileInputStream(snapshotPath.resolve("snapshot_file").toFile());
+                    ObjectInputStream objIn = new ObjectInputStream(in)
+                ) {
+                    var keys = (List<byte[]>)objIn.readObject();
+                    var values = (List<byte[]>)objIn.readObject();
+
+                    map.clear();
+
+                    for (int i = 0; i < keys.size(); i++)
+                        map.put(new ByteArray(keys.get(i)), values.get(i));
+                }
+                catch (Exception e) {
+                    throw new IgniteInternalException(e);
+                }
+            }
+        });
+
+        paths.put(listener, tableDir);

Review comment:
       I don't think this logic is needed: `tableDir` is not used anywhere, why create it at all?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -122,27 +133,39 @@
     /** Tables. */
     private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
 
+    /** Default data region instance. */
+    private DataRegion defaultDataRegion;
+
+    //TODO: IGNITE-15161 These should go into TableImpl instances.
+    /** Instances of table storages that need to be stop on component stop. */

Review comment:
       ```suggestion
       /** Instances of table storages that need to be stopped on component stop. */
   ```

##########
File path: modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
##########
@@ -17,30 +17,74 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 /**
  * Storage test implementation for {@link RocksDbStorage}.
  */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 public class RocksDbStorageTest extends AbstractStorageTest {
+    /** */
+    private TableStorage table;
+
+    /** */
+    private DataRegion dataRegion;
+
     /** */
     @BeforeEach
-    public void setUp(@WorkDirectory Path workDir) {
-        storage = new RocksDbStorage(workDir, ByteBuffer::compareTo);
+    public void setUp(
+        @WorkDirectory Path workDir,
+        @InjectConfiguration DataRegionConfiguration dataRegionCfg,
+        @InjectConfiguration TableConfiguration tableCfg
+    ) throws Exception {
+        dataRegionCfg.change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)).get();
+
+        RocksDbStorageEngine engine = new RocksDbStorageEngine();
+
+        dataRegion = engine.createDataRegion(dataRegionCfg);
+
+        assertThat(dataRegion, is(instanceOf(RocksDbDataRegion.class)));
+
+        dataRegion.start();
+
+        table = engine.createTable(workDir, tableCfg, dataRegion, (tableView, indexName) -> null);
+
+        assertThat(table, is(instanceOf(RocksDbTableStorage.class)));
+
+        table.start();
+
+        storage = table.getOrCreatePartition(0);
+
+        assertThat(storage, is(instanceOf(RocksDbStorage.class)));
     }
 
     /** */
     @AfterEach
     public void tearDown() throws Exception {
-        if (storage != null)
-            ((AutoCloseable)storage).close();
+        try {

Review comment:
       you can also write:
   ```
   IgniteUtils.closeAll(
       table == null ? null : table::stop,
       dataRegion == null ? null : dataRegion::stop
   );
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+
+/**
+ * Table storage that contains meta, partitions and SQL indexes.
+ */
+public interface TableStorage {
+    /**
+     * Gets or creates a partition for current table.
+     *
+     * @param partId Partition id.
+     * @return Partition storage.
+     */
+    Storage getOrCreatePartition(int partId);

Review comment:
       Shall we rename `Storage` to `PartitionStorage`?

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfgView, indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return "index-comparator";
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, absolutePathStr, cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitionCfs = new ArrayList<>(tableCfgView.partitions());
+
+        partitionCfs.addAll(Collections.nCopies(tableCfgView.partitions(), null));
+
+        for (int cfListIndex = 0; cfListIndex < cfHandles.size(); cfListIndex++) {
+            ColumnFamilyHandle cfHandle = cfHandles.get(cfListIndex);
+
+            String handleName;
+            try {
+                handleName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
+            }
+            catch (RocksDBException e) {
+                throw new StorageException("Failed to read RocksDB column family name.", e);
+            }
+
+            if (handleName.equals(CF_META))
+                this.metaCfHandle = addToCloseableResources(cfHandle);
+            else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
+                int partId = partitionId(handleName);
+
+                ColumnFamilyDescriptor cfDescriptor = cfDescriptors.get(cfListIndex);
+
+                partitionCfs.set(partId, new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null));
+            }
+            else {
+                String indexName = handleName.substring(CF_INDEX_PREFIX.length());
+
+                indicesCfHandles.put(indexName, cfHandle);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws StorageException {
+        try {
+            List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
+
+            Collections.reverse(copy);
+
+            IgniteUtils.closeAll(concat(
+                concat(partitionCfs.stream(), indicesCfHandles.values().stream()),
+                copy.stream()
+            ));
+        }
+        catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB table storage.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Storage getOrCreatePartition(int partId) {
+        assert partId < partitionCfs.size() : S.toString(
+            "Attempt to create partition with id outside of configured range",
+            "partitionId", partId, false,
+            "partitions", partitionCfs.size(), false
+        );
+
+        ColumnFamily partitionCf = partitionCfs.get(partId);
+
+        if (partitionCf == null) {
+            String handleName = partitionCfName(partId);
+
+            ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
+                handleName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            );
+
+            try {
+                ColumnFamilyHandle cfHandle = db.createColumnFamily(cfDescriptor);
+
+                partitionCf = new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null);
+            }
+            catch (RocksDBException e) {
+                cfDescriptor.getOptions().close();
+
+                throw new StorageException("Feiled to create new ROcksDB column family " + handleName, e);
+            }
+
+            partitionCfs.set(partId, partitionCf);
+        }
+
+        return new RocksDbStorage(db, partitionCf);
+    }
+
+    /**
+     * Returns list of column families names that belong to RocksDB instance in the given path.
+     *
+     * @param path DB path.
+     * @return List of column families names.
+     * @throws StorageException If something went wrong.
+     */
+    @NotNull private List<String> listCfNames(String path) throws StorageException {
+        List<String> cfNames = new ArrayList<>();
+
+        try (Options opts = new Options()) {
+            List<byte[]> cfNamesBytes = RocksDB.listColumnFamilies(opts, path);
+
+            for (byte[] cfNameBytes : cfNamesBytes)
+                cfNames.add(new String(cfNameBytes, StandardCharsets.UTF_8));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException(
+                "Failed to read list of columnfamilies names for the RocksDB instance located at path " + path,
+                e
+            );
+        }
+
+        return cfNames;
+    }
+
+    /**
+     * Creates column family name by partition id.
+     *
+     * @param partId Partition id.
+     * @return Column family name.
+     */
+    private static String partitionCfName(int partId) {
+        return CF_PARTITION_PREFIX + partId;
+    }
+
+    /**
+     * Gets partition id from column family name.
+     *
+     * @param cfName Column family name.
+     * @return Partition id.
+     */
+    private int partitionId(String cfName) {

Review comment:
       ```suggestion
       private static int partitionId(String cfName) {
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))

Review comment:
       please extract intermediate variables, it's currently hard to read

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {

Review comment:
       Can you split this method into smaller ones? For example, you can introduce a method called `getExistingCfDescriptors`

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfgView, indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return "index-comparator";
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, absolutePathStr, cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitionCfs = new ArrayList<>(tableCfgView.partitions());
+
+        partitionCfs.addAll(Collections.nCopies(tableCfgView.partitions(), null));
+
+        for (int cfListIndex = 0; cfListIndex < cfHandles.size(); cfListIndex++) {
+            ColumnFamilyHandle cfHandle = cfHandles.get(cfListIndex);
+
+            String handleName;
+            try {
+                handleName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
+            }
+            catch (RocksDBException e) {
+                throw new StorageException("Failed to read RocksDB column family name.", e);
+            }
+
+            if (handleName.equals(CF_META))
+                this.metaCfHandle = addToCloseableResources(cfHandle);
+            else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
+                int partId = partitionId(handleName);
+
+                ColumnFamilyDescriptor cfDescriptor = cfDescriptors.get(cfListIndex);
+
+                partitionCfs.set(partId, new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null));
+            }
+            else {
+                String indexName = handleName.substring(CF_INDEX_PREFIX.length());
+
+                indicesCfHandles.put(indexName, cfHandle);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws StorageException {
+        try {
+            List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
+
+            Collections.reverse(copy);
+
+            IgniteUtils.closeAll(concat(
+                concat(partitionCfs.stream(), indicesCfHandles.values().stream()),
+                copy.stream()
+            ));
+        }
+        catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB table storage.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Storage getOrCreatePartition(int partId) {
+        assert partId < partitionCfs.size() : S.toString(
+            "Attempt to create partition with id outside of configured range",
+            "partitionId", partId, false,
+            "partitions", partitionCfs.size(), false
+        );
+
+        ColumnFamily partitionCf = partitionCfs.get(partId);
+
+        if (partitionCf == null) {
+            String handleName = partitionCfName(partId);
+
+            ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
+                handleName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            );
+
+            try {
+                ColumnFamilyHandle cfHandle = db.createColumnFamily(cfDescriptor);
+
+                partitionCf = new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null);
+            }
+            catch (RocksDBException e) {
+                cfDescriptor.getOptions().close();
+
+                throw new StorageException("Feiled to create new ROcksDB column family " + handleName, e);
+            }
+
+            partitionCfs.set(partId, partitionCf);
+        }
+
+        return new RocksDbStorage(db, partitionCf);
+    }
+
+    /**
+     * Returns list of column families names that belong to RocksDB instance in the given path.
+     *
+     * @param path DB path.
+     * @return List of column families names.
+     * @throws StorageException If something went wrong.
+     */
+    @NotNull private List<String> listCfNames(String path) throws StorageException {
+        List<String> cfNames = new ArrayList<>();
+
+        try (Options opts = new Options()) {
+            List<byte[]> cfNamesBytes = RocksDB.listColumnFamilies(opts, path);
+
+            for (byte[] cfNameBytes : cfNamesBytes)
+                cfNames.add(new String(cfNameBytes, StandardCharsets.UTF_8));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException(
+                "Failed to read list of columnfamilies names for the RocksDB instance located at path " + path,
+                e
+            );
+        }
+
+        return cfNames;
+    }
+
+    /**
+     * Creates column family name by partition id.
+     *
+     * @param partId Partition id.
+     * @return Column family name.
+     */
+    private static String partitionCfName(int partId) {
+        return CF_PARTITION_PREFIX + partId;
+    }
+
+    /**
+     * Gets partition id from column family name.
+     *
+     * @param cfName Column family name.
+     * @return Partition id.
+     */
+    private int partitionId(String cfName) {
+        return parseInt(cfName.substring(CF_PARTITION_PREFIX.length(), cfName.length()));

Review comment:
       ```suggestion
           return parseInt(cfName.substring(CF_PARTITION_PREFIX.length()));
   ```

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -63,6 +78,9 @@
     /** */
     private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
 
+    /** Paths for created partition listeners. */
+    private Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();

Review comment:
       also, why is this a concurrent map? I can't find any concurrent access on it...

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.rocksdb.RocksDB;
+
+/**
+ * Storage engine implementation based on RocksDB.
+ */
+public class RocksDbStorageEngine implements StorageEngine<DataStorageConfiguration, DataRegionConfiguration> {
+    static {
+        RocksDB.loadLibrary();

Review comment:
       why do you need this here?

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -125,14 +143,53 @@
 
     /** {@inheritDoc} */
     @Override public Path getListenerPersistencePath(PartitionListener listener) {
-        return ((RocksDbStorage) listener.getStorage()).getDbPath();
+        return paths.get(listener);
     }
 
     /** {@inheritDoc} */
     @Override public RaftGroupListener createListener(Path workDir) {
-        return new PartitionListener(
-            new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)

Review comment:
       Do I understand correctly that we no longer have tests for creating snapshots using the `RocksDbStorage`?

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -125,14 +143,53 @@
 
     /** {@inheritDoc} */
     @Override public Path getListenerPersistencePath(PartitionListener listener) {
-        return ((RocksDbStorage) listener.getStorage()).getDbPath();
+        return paths.get(listener);
     }
 
     /** {@inheritDoc} */
     @Override public RaftGroupListener createListener(Path workDir) {
-        return new PartitionListener(
-            new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)
-        );
+        Path tableDir = workDir.resolve(UUID.randomUUID().toString());
+
+        PartitionListener listener = new PartitionListener(new ConcurrentHashMapStorage() {
+            /** {@inheritDoc} */
+            @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {

Review comment:
       why aren't these methods in the base class? Also, maybe we should move the `ConcurrentHashMapStorage` to a test package?

##########
File path: modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Configuration schema for data region. Currently it represents configuration for rocksdb storage engine only.
+ */
+@Config
+public class DataRegionConfigurationSchema {
+    /** Type for the future polymorphic configuration schemas. */
+    @OneOf("rocksdb")
+    @Value(hasDefault = true)
+    public String type = "rocksdb";
+
+    /** Size of the rocksdb offheap cache. */
+    @Value(hasDefault = true)
+    public int size = 64 * 1024 * 1024;
+
+    /** Size of rocksdb write buffer. */
+    @Value(hasDefault = true)
+    @Min(1)
+    public long writeBufferSize = 64 * 1024 * 1024;
+
+    /** Cache type - either {@code Clock} or {@code LRU}. */
+    @OneOf({"Clock", "LRU"})
+    @Value(hasDefault = true)
+    public String cache = "LRU";
+
+    /** Logarithm of the number of cache shards. {@code -1} if no shards required. */

Review comment:
       I would suggest to copy the documentation from RocksDb, current javadoc is hard to understand

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+
+/**
+ * General storageengine interface.
+ *
+ * @param <StorageCfg> Type of storage configuration.
+ * @param <RegionCfg> Type of data region configuration.
+ */
+public interface StorageEngine<
+    StorageCfg extends DataStorageConfiguration,

Review comment:
       why do you need these generics?

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -125,14 +143,53 @@
 
     /** {@inheritDoc} */
     @Override public Path getListenerPersistencePath(PartitionListener listener) {
-        return ((RocksDbStorage) listener.getStorage()).getDbPath();
+        return paths.get(listener);
     }
 
     /** {@inheritDoc} */
     @Override public RaftGroupListener createListener(Path workDir) {
-        return new PartitionListener(
-            new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)
-        );
+        Path tableDir = workDir.resolve(UUID.randomUUID().toString());
+
+        PartitionListener listener = new PartitionListener(new ConcurrentHashMapStorage() {
+            /** {@inheritDoc} */
+            @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+                return CompletableFuture.runAsync(() -> {
+                    try (
+                        OutputStream out = new FileOutputStream(snapshotPath.resolve("snapshot_file").toFile());
+                        ObjectOutputStream objOut = new ObjectOutputStream(out)
+                    ) {
+                        objOut.writeObject(map.keySet().stream().map(ByteArray::bytes).collect(toList()));
+                        objOut.writeObject(map.values().stream().collect(toList()));
+                    }
+                    catch (Exception e) {
+                        throw new IgniteInternalException(e);
+                    }
+                });
+            }
+
+            /** {@inheritDoc} */
+            @Override public void restoreSnapshot(Path snapshotPath) {
+                try (
+                    InputStream in = new FileInputStream(snapshotPath.resolve("snapshot_file").toFile());

Review comment:
       same here: `Files.newInputStream(snapshotPath.resolve("snapshot_file"))`

##########
File path: modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Configuration schema for data region. Currently it represents configuration for rocksdb storage engine only.
+ */
+@Config
+public class DataRegionConfigurationSchema {
+    /** Type for the future polymorphic configuration schemas. */
+    @OneOf("rocksdb")
+    @Value(hasDefault = true)
+    public String type = "rocksdb";
+
+    /** Size of the rocksdb offheap cache. */
+    @Value(hasDefault = true)
+    public int size = 64 * 1024 * 1024;
+
+    /** Size of rocksdb write buffer. */
+    @Value(hasDefault = true)
+    @Min(1)
+    public long writeBufferSize = 64 * 1024 * 1024;

Review comment:
       Why do we need the Write Buffer Manager here? Also, the property name is a bit misleading to me...

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -113,6 +121,9 @@
     /** Meta storage service. */
     private final MetaStorageManager metaStorageMgr;
 
+    /** Storage engine instance. Only one type is available right not, it's a {@link RocksDbStorageEngine}. */

Review comment:
       ```suggestion
       /** Storage engine instance. Only one type is available right now, which is the {@link RocksDbStorageEngine}. */
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory

Review comment:
       I would suggest creating a separate functional interface for this. Also the idea of index comparators is not clear to me...

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -253,30 +295,40 @@ private void createTableLocally(
 
         var partitionsGroupsFutures = new ArrayList<CompletableFuture<RaftGroupService>>();
 
+        Path storageDir = partitionsStoreDir.resolve(name);
+
+        try {
+            Files.createDirectories(storageDir);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(
+                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+                e
+            );
+        }
+
+        TableStorage tableStorage = engine.createTable(
+            storageDir,
+            tablesCfg.tables().get(name),
+            defaultDataRegion,
+            (tableCfgView, indexName) -> {
+                throw new UnsupportedOperationException("Not implemented yet.");
+            }
+        );
+
+        tableStorage.start();
+
+        tableStorages.add(tableStorage);
+
         IntStream.range(0, partitions).forEach(p ->
             partitionsGroupsFutures.add(
                 raftMgr.prepareRaftGroup(
                     raftGroupName(tblId, p),
                     assignment.get(p),
                     () -> {
-                        Path storageDir = partitionsStoreDir.resolve(name);
-
-                        try {
-                            Files.createDirectories(storageDir);
-                        }
-                        catch (IOException e) {
-                            throw new IgniteInternalException(
-                                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
-                                e
-                            );
-                        }
+                        Storage storage = tableStorage.getOrCreatePartition(p);
 
-                        return new PartitionListener(
-                            new RocksDbStorage(
-                                storageDir.resolve(String.valueOf(p)),
-                                ByteBuffer::compareTo
-                            )
-                        );
+                        return new PartitionListener(storage);

Review comment:
       Can we inline this stuff: `() -> new PartitionListener(tableStorage.getOrCreatePartition(p))` ?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -253,30 +295,40 @@ private void createTableLocally(
 
         var partitionsGroupsFutures = new ArrayList<CompletableFuture<RaftGroupService>>();
 
+        Path storageDir = partitionsStoreDir.resolve(name);
+
+        try {
+            Files.createDirectories(storageDir);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(
+                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+                e
+            );
+        }
+
+        TableStorage tableStorage = engine.createTable(
+            storageDir,
+            tablesCfg.tables().get(name),
+            defaultDataRegion,
+            (tableCfgView, indexName) -> {
+                throw new UnsupportedOperationException("Not implemented yet.");
+            }
+        );
+
+        tableStorage.start();
+
+        tableStorages.add(tableStorage);
+
         IntStream.range(0, partitions).forEach(p ->

Review comment:
       I know that it is not related to this PR, but can you rewrite this code using a for loop? I love streams and all, but in this case they are a bit redundant

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfgView, indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return "index-comparator";
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, absolutePathStr, cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitionCfs = new ArrayList<>(tableCfgView.partitions());
+
+        partitionCfs.addAll(Collections.nCopies(tableCfgView.partitions(), null));
+
+        for (int cfListIndex = 0; cfListIndex < cfHandles.size(); cfListIndex++) {
+            ColumnFamilyHandle cfHandle = cfHandles.get(cfListIndex);
+
+            String handleName;
+            try {
+                handleName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
+            }
+            catch (RocksDBException e) {
+                throw new StorageException("Failed to read RocksDB column family name.", e);
+            }
+
+            if (handleName.equals(CF_META))
+                this.metaCfHandle = addToCloseableResources(cfHandle);
+            else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
+                int partId = partitionId(handleName);
+
+                ColumnFamilyDescriptor cfDescriptor = cfDescriptors.get(cfListIndex);
+
+                partitionCfs.set(partId, new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null));
+            }
+            else {
+                String indexName = handleName.substring(CF_INDEX_PREFIX.length());
+
+                indicesCfHandles.put(indexName, cfHandle);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws StorageException {
+        try {
+            List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
+
+            Collections.reverse(copy);
+
+            IgniteUtils.closeAll(concat(
+                concat(partitionCfs.stream(), indicesCfHandles.values().stream()),
+                copy.stream()
+            ));
+        }
+        catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB table storage.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Storage getOrCreatePartition(int partId) {
+        assert partId < partitionCfs.size() : S.toString(
+            "Attempt to create partition with id outside of configured range",
+            "partitionId", partId, false,
+            "partitions", partitionCfs.size(), false
+        );
+
+        ColumnFamily partitionCf = partitionCfs.get(partId);
+
+        if (partitionCf == null) {
+            String handleName = partitionCfName(partId);
+
+            ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
+                handleName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            );
+
+            try {
+                ColumnFamilyHandle cfHandle = db.createColumnFamily(cfDescriptor);
+
+                partitionCf = new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null);
+            }
+            catch (RocksDBException e) {
+                cfDescriptor.getOptions().close();
+
+                throw new StorageException("Feiled to create new ROcksDB column family " + handleName, e);
+            }
+
+            partitionCfs.set(partId, partitionCf);
+        }
+
+        return new RocksDbStorage(db, partitionCf);
+    }
+
+    /**
+     * Returns list of column families names that belong to RocksDB instance in the given path.
+     *
+     * @param path DB path.
+     * @return List of column families names.
+     * @throws StorageException If something went wrong.
+     */
+    @NotNull private List<String> listCfNames(String path) throws StorageException {

Review comment:
       this function always accepts `tablePath` as an argument, can we inline that?

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfgView, indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return "index-comparator";
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, absolutePathStr, cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitionCfs = new ArrayList<>(tableCfgView.partitions());
+
+        partitionCfs.addAll(Collections.nCopies(tableCfgView.partitions(), null));
+
+        for (int cfListIndex = 0; cfListIndex < cfHandles.size(); cfListIndex++) {
+            ColumnFamilyHandle cfHandle = cfHandles.get(cfListIndex);
+
+            String handleName;
+            try {
+                handleName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
+            }
+            catch (RocksDBException e) {
+                throw new StorageException("Failed to read RocksDB column family name.", e);
+            }
+
+            if (handleName.equals(CF_META))
+                this.metaCfHandle = addToCloseableResources(cfHandle);
+            else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
+                int partId = partitionId(handleName);
+
+                ColumnFamilyDescriptor cfDescriptor = cfDescriptors.get(cfListIndex);
+
+                partitionCfs.set(partId, new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null));
+            }
+            else {
+                String indexName = handleName.substring(CF_INDEX_PREFIX.length());
+
+                indicesCfHandles.put(indexName, cfHandle);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws StorageException {
+        try {
+            List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
+
+            Collections.reverse(copy);
+
+            IgniteUtils.closeAll(concat(
+                concat(partitionCfs.stream(), indicesCfHandles.values().stream()),
+                copy.stream()
+            ));
+        }
+        catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB table storage.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Storage getOrCreatePartition(int partId) {
+        assert partId < partitionCfs.size() : S.toString(
+            "Attempt to create partition with id outside of configured range",
+            "partitionId", partId, false,
+            "partitions", partitionCfs.size(), false
+        );
+
+        ColumnFamily partitionCf = partitionCfs.get(partId);
+
+        if (partitionCf == null) {
+            String handleName = partitionCfName(partId);
+
+            ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
+                handleName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            );
+
+            try {
+                ColumnFamilyHandle cfHandle = db.createColumnFamily(cfDescriptor);
+
+                partitionCf = new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null);
+            }
+            catch (RocksDBException e) {
+                cfDescriptor.getOptions().close();
+
+                throw new StorageException("Feiled to create new ROcksDB column family " + handleName, e);

Review comment:
       ```suggestion
                   throw new StorageException("Failed to create new RocksDB column family " + handleName, e);
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfgView, indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return "index-comparator";
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, absolutePathStr, cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitionCfs = new ArrayList<>(tableCfgView.partitions());

Review comment:
       can be written shorter: `partitionCfs = new ArrayList<>(Collections.nCopies(tableCfgView.partitions(), null));`

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();

Review comment:
       Why is this a `ConcurrentMap` while `partitionCf` is a regular list?

##########
File path: modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
##########
@@ -17,30 +17,74 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 /**
  * Storage test implementation for {@link RocksDbStorage}.
  */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 public class RocksDbStorageTest extends AbstractStorageTest {
+    /** */
+    private TableStorage table;
+
+    /** */
+    private DataRegion dataRegion;
+
     /** */
     @BeforeEach
-    public void setUp(@WorkDirectory Path workDir) {
-        storage = new RocksDbStorage(workDir, ByteBuffer::compareTo);
+    public void setUp(

Review comment:
       I think it would be easier to have a single test method instead of `setUp` and `tearDown`, it's quite confusing otherwise

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!

Review comment:
       ???

##########
File path: modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
##########
@@ -17,30 +17,74 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 /**
  * Storage test implementation for {@link RocksDbStorage}.
  */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 public class RocksDbStorageTest extends AbstractStorageTest {
+    /** */
+    private TableStorage table;
+
+    /** */
+    private DataRegion dataRegion;
+
     /** */
     @BeforeEach
-    public void setUp(@WorkDirectory Path workDir) {
-        storage = new RocksDbStorage(workDir, ByteBuffer::compareTo);
+    public void setUp(

Review comment:
       Also, this test is strange: what are you exactly trying to test here?

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfgView, indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return "index-comparator";
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, absolutePathStr, cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitionCfs = new ArrayList<>(tableCfgView.partitions());

Review comment:
       or maybe it's even easier to use a regular array instead

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as a list for the quick access by an index. */
+    private List<ColumnFamily> partitionCfs;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum CFType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = listCfNames(absolutePathStr);
+
+        Map<CFType, List<String>> cfGrouped = cfNames.stream().collect(groupingBy(this::cfType));
+
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(addToCloseableResources(new Options().setCreateIfMissing(true))))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(CFType.PARTITION, List.of()))
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+
+        TableView tableCfgView = tableCfg.value();
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfgView.indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(CFType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : indexCfName; //TODO Bullshit!
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfgView, indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return "index-comparator";
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, absolutePathStr, cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitionCfs = new ArrayList<>(tableCfgView.partitions());
+
+        partitionCfs.addAll(Collections.nCopies(tableCfgView.partitions(), null));
+
+        for (int cfListIndex = 0; cfListIndex < cfHandles.size(); cfListIndex++) {
+            ColumnFamilyHandle cfHandle = cfHandles.get(cfListIndex);
+
+            String handleName;
+            try {
+                handleName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
+            }
+            catch (RocksDBException e) {
+                throw new StorageException("Failed to read RocksDB column family name.", e);
+            }
+
+            if (handleName.equals(CF_META))
+                this.metaCfHandle = addToCloseableResources(cfHandle);
+            else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
+                int partId = partitionId(handleName);
+
+                ColumnFamilyDescriptor cfDescriptor = cfDescriptors.get(cfListIndex);
+
+                partitionCfs.set(partId, new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null));
+            }
+            else {
+                String indexName = handleName.substring(CF_INDEX_PREFIX.length());
+
+                indicesCfHandles.put(indexName, cfHandle);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws StorageException {
+        try {
+            List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
+
+            Collections.reverse(copy);
+
+            IgniteUtils.closeAll(concat(
+                concat(partitionCfs.stream(), indicesCfHandles.values().stream()),

Review comment:
       you already create a copy of the `autoCloseables` list, I would suggest to add all other stuff there as well and get rid of this `concat` shenanigans




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org