You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/10/07 14:22:59 UTC

[ignite-3] branch ignite-3.0.0-alpha3 updated: IGNITE-15351 Implemented concepts of storage engines and data regions with basic integration into existing code. (#365)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch ignite-3.0.0-alpha3
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-alpha3 by this push:
     new 33e6ed1  IGNITE-15351 Implemented concepts of storage engines and data regions with basic integration into existing code. (#365)
33e6ed1 is described below

commit 33e6ed1ca023999cabcd8c56e9f94c1d911dea65
Author: ibessonov <be...@gmail.com>
AuthorDate: Thu Oct 7 16:35:40 2021 +0300

    IGNITE-15351 Implemented concepts of storage engines and data regions with basic integration into existing code. (#365)
    
    Signed-off-by: ibessonov <be...@gmail.com>
---
 .../store/DataRegionConfigurationSchema.java       |  64 ++++
 .../store/DataStorageConfigurationSchema.java}     |  31 +-
 .../org/apache/ignite/cli/ITConfigCommandTest.java |  18 +-
 .../notifications/ConfigurationListener.java       |   1 -
 .../OneOf.java}                                    |  31 +-
 .../configuration/ConfigurationChanger.java        |  10 +-
 .../configuration/ConfigurationManager.java        |   3 +-
 .../configuration/ConfigurationRegistry.java       |   9 +-
 .../internal/configuration/DynamicProperty.java    |   6 +
 .../asm/ConfigurationAsmGenerator.java             |   5 +-
 .../configuration/util/ConfigurationUtil.java      |  17 +-
 .../configuration/validation/OneOfValidator.java   |  46 +++
 .../configuration/ConfigurationChangerTest.java    |   6 +-
 .../validation/OneOfValidatorTest.java             |  80 +++++
 .../apache/ignite/internal/util/IgniteUtils.java   |  40 ++-
 .../ignite/internal/rocksdb/ColumnFamily.java      |   7 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   5 +-
 modules/storage-api/pom.xml                        |   2 +-
 .../{Storage.java => PartitionStorage.java}        |   2 +-
 .../internal/storage/engine/DataRegion.java}       |  14 +-
 .../internal/storage/engine/StorageEngine.java     |  55 +++
 .../internal/storage/engine/TableStorage.java}     |  36 +-
 ...Test.java => AbstractPartitionStorageTest.java} |  16 +-
 ...java => ConcurrentHashMapPartitionStorage.java} |   4 +-
 .../basic/ConcurrentHashMapStorageTest.java        |   8 +-
 modules/storage-rocksdb/pom.xml                    |  19 +
 .../storage/rocksdb/RocksDbDataRegion.java         |  95 +++++
 ...DbStorage.java => RocksDbPartitionStorage.java} | 102 +-----
 .../storage/rocksdb/RocksDbStorageEngine.java      |  61 ++++
 .../storage/rocksdb/RocksDbTableStorage.java       | 384 +++++++++++++++++++++
 .../storage/rocksdb/RocksDbStorageTest.java        |  55 ++-
 .../ignite/distributed/ITDistributedTableTest.java |   6 +-
 .../distributed/ITInternalTableScanTest.java       |   6 +-
 .../ignite/distributed/ITTablePersistenceTest.java |   8 +-
 .../internal/table/distributed/TableManager.java   | 101 ++++--
 .../table/distributed/raft/PartitionListener.java  |  13 +-
 .../ignite/internal/table/TableManagerTest.java    |  16 +-
 .../raft/PartitionCommandListenerTest.java         |   4 +-
 .../vault/persistence/PersistentVaultService.java  |   2 +-
 39 files changed, 1127 insertions(+), 261 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
new file mode 100644
index 0000000..f32381f
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Immutable;
+import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Configuration schema for data region. Currently it represents configuration for rocksdb storage engine only.
+ */
+@Config
+public class DataRegionConfigurationSchema {
+    /** Type of the RocksDB data region. */
+    public static final String ROCKSDB_DATA_REGION_TYPE = "rocksdb";
+
+    /** Cache type for the RocksDB LRU cache. */
+    public static final String ROCKSDB_LRU_CACHE = "lru";
+
+    /** Cache type for the RocksDB LRU cache. */
+    public static final String ROCKSDB_CLOCK_CACHE = "clock";
+
+    /** Type for the future polymorphic configuration schemas. */
+    @Immutable
+    @OneOf(ROCKSDB_DATA_REGION_TYPE)
+    @Value(hasDefault = true)
+    public String type = ROCKSDB_DATA_REGION_TYPE;
+
+    /** Size of the rocksdb offheap cache. */
+    @Value(hasDefault = true)
+    public long size = 256 * 1024 * 1024;
+
+    /** Size of rocksdb write buffer. */
+    @Value(hasDefault = true)
+    @Min(1)
+    public long writeBufferSize = 64 * 1024 * 1024;
+
+    /** Cache type - only {@code LRU} is supported at the moment. {@code Clock} implementation has known bugs. */
+    @OneOf({ROCKSDB_LRU_CACHE})
+    @Value(hasDefault = true)
+    public String cache = ROCKSDB_LRU_CACHE;
+
+    /** The cache is sharded to 2^numShardBits shards, by hash of the key. */
+    @Min(-1)
+    @Value(hasDefault = true)
+    public int numShardBits = -1;
+}
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
similarity index 54%
copy from modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
copy to modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
index 534d5ec..0434d7b 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
@@ -15,24 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.configuration.notifications;
+package org.apache.ignite.configuration.schemas.store;
 
-import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.configuration.annotation.ConfigValue;
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.NamedConfigValue;
 
 /**
- * Configuration property change listener.
- *
- * @param <VIEW> VIEW type configuration.
+ * Root configuration for data storages.
  */
-@FunctionalInterface
-public interface ConfigurationListener<VIEW> {
-    /**
-     * Called on property value update.
-     *
-     * @param ctx Notification context.
-     * @return Future that signifies the end of the listener execution.
-     */
-    @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
-}
+@ConfigurationRoot(rootName = "db", type = ConfigurationType.DISTRIBUTED)
+public class DataStorageConfigurationSchema {
+    /** Default data region. */
+    @ConfigValue
+    public DataRegionConfigurationSchema defaultRegion;
 
+    /** Other data regions. */
+    @NamedConfigValue
+    public DataRegionConfigurationSchema regions;
+}
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java
index a8e7a18..571a777 100644
--- a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java
@@ -35,6 +35,7 @@ import picocli.CommandLine;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Integration test for {@code ignite config} commands.
@@ -117,13 +118,16 @@ public class ITConfigCommandTest extends AbstractCliTest {
         );
 
         assertEquals(0, exitCode);
-        assertEquals(
-            "\"{\"clientConnector\":{\"connectTimeout\":5000,\"port\":" + clientPort + ",\"portRange\":0}," +
-                "\"network\":{\"netClusterNodes\":[],\"port\":" + networkPort + "}," +
-                "\"node\":{\"metastorageNodes\":[\"localhost1\"]}," +
-                "\"rest\":{\"port\":" + restPort + ",\"portRange\":0}}\"" + nl,
-            unescapeQuotes(out.toString())
-        );
+
+        String unescapedOut = unescapeQuotes(out.toString());
+
+        assertTrue(unescapedOut.contains(
+            "\"clientConnector\":{\"connectTimeout\":5000,\"port\":" + clientPort + ",\"portRange\":0}"
+        ), unescapedOut);
+
+        assertTrue(unescapedOut.contains(
+            "\"rest\":{\"port\":" + restPort + ",\"portRange\":0}}\""
+        ), unescapedOut);
     }
 
     @Test
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
index 534d5ec..36de3c8 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
@@ -35,4 +35,3 @@ public interface ConfigurationListener<VIEW> {
      */
     @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
 }
-
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/validation/OneOf.java
similarity index 58%
copy from modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
copy to modules/configuration-api/src/main/java/org/apache/ignite/configuration/validation/OneOf.java
index 534d5ec..b6c2f8f 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/validation/OneOf.java
@@ -15,24 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.configuration.notifications;
+package org.apache.ignite.configuration.validation;
 
-import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.NotNull;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
 /**
- * Configuration property change listener.
- *
- * @param <VIEW> VIEW type configuration.
+ * Signifies that current {@code String} configuration values can only be equal to one of the listed values.
  */
-@FunctionalInterface
-public interface ConfigurationListener<VIEW> {
+@Target(FIELD)
+@Retention(RUNTIME)
+public @interface OneOf {
     /**
-     * Called on property value update.
-     *
-     * @param ctx Notification context.
-     * @return Future that signifies the end of the listener execution.
+     * @return List of possible values.
      */
-    @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
-}
+    String[] value();
 
+    /**
+     * @return {@code true} if list is case sensitive.
+     */
+    boolean caseSensitive() default false;
+}
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 154d8d0..2172243 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -205,6 +205,8 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
             superRoot.addRoot(rootKey, rootNode);
         }
 
+        ConfigurationUtil.addDefaults(superRoot);
+
         storageRoots = new StorageRoots(superRoot, data.changeId());
 
         storage.registerConfigurationListener(this::updateFromListener);
@@ -312,13 +314,15 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
 
         Map<String, ? extends Serializable> storageData = storage.readAllLatest(ConfigurationUtil.join(storagePath));
 
-        if (storageData.isEmpty())
-            throw new NoSuchElementException(ConfigurationUtil.join(path));
-
         InnerNode rootNode = new SuperRoot(rootCreator());
 
         fillFromPrefixMap(rootNode, toPrefixMap(storageData));
 
+        if (storageData.isEmpty())
+            rootNode.construct(path.get(0), ConfigurationUtil.EMPTY_CFG_SRC, true);
+
+        addDefaults(rootNode);
+
         try {
             T result = ConfigurationUtil.find(path, rootNode, true);
 
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java
index 170594d..0a54d78 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.validation.Validator;
 import org.apache.ignite.internal.configuration.hocon.HoconConverter;
 import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.intellij.lang.annotations.Language;
 
 import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.checkConfigurationType;
 
@@ -82,7 +83,7 @@ public class ConfigurationManager implements IgniteComponent {
      * @throws InterruptedException If thread is interrupted during bootstrap.
      * @throws ExecutionException If configuration update failed for some reason.
      */
-    public void bootstrap(String hoconStr) throws InterruptedException, ExecutionException {
+    public void bootstrap(@Language("HOCON") String hoconStr) throws InterruptedException, ExecutionException {
         ConfigObject hoconCfg = ConfigFactory.parseString(hoconStr).root();
 
         registry.change(HoconConverter.hoconSource(hoconCfg)).get();
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
index 09223bf..fd03942 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
@@ -36,6 +36,7 @@ import org.apache.ignite.configuration.annotation.InternalConfiguration;
 import org.apache.ignite.configuration.validation.Immutable;
 import org.apache.ignite.configuration.validation.Max;
 import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
 import org.apache.ignite.configuration.validation.Validator;
 import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
 import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.configuration.util.KeyNotFoundException;
 import org.apache.ignite.internal.configuration.validation.ImmutableValidator;
 import org.apache.ignite.internal.configuration.validation.MaxValidator;
 import org.apache.ignite.internal.configuration.validation.MinValidator;
+import org.apache.ignite.internal.configuration.validation.OneOfValidator;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.lang.IgniteLogger;
 
@@ -114,9 +116,10 @@ public class ConfigurationRegistry implements IgniteComponent {
 
         Map<Class<? extends Annotation>, Set<Validator<?, ?>>> validators0 = new HashMap<>(validators);
 
-        validators0.computeIfAbsent(Min.class, a -> new HashSet<>()).add(new MinValidator());
-        validators0.computeIfAbsent(Max.class, a -> new HashSet<>()).add(new MaxValidator());
-        validators0.computeIfAbsent(Immutable.class, a -> new HashSet<>()).add(new ImmutableValidator());
+        validators0.computeIfAbsent(Min.class, a -> new HashSet<>(1)).add(new MinValidator());
+        validators0.computeIfAbsent(Max.class, a -> new HashSet<>(1)).add(new MaxValidator());
+        validators0.computeIfAbsent(Immutable.class, a -> new HashSet<>(1)).add(new ImmutableValidator());
+        validators0.computeIfAbsent(OneOf.class, a -> new HashSet<>(1)).add(new OneOfValidator());
 
         changer = new ConfigurationChanger(this::notificator, rootKeys, validators0, storage) {
             /** {@inheritDoc} */
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java
index feb5de9..925cd09 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.configuration.RootKey;
 import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
 import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
+import org.apache.ignite.internal.tostring.S;
 
 /**
  * Holder for property value.
@@ -100,4 +101,9 @@ public class DynamicProperty<T extends Serializable> extends ConfigurationNode<T
     @Override public String key() {
         return key;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DynamicProperty.class, this, "key", key, "value", value());
+    }
 }
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
index ee9cdde..03edebe 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -308,7 +309,9 @@ public class ConfigurationAsmGenerator {
 
             assert schemasInfo.containsKey(schemaClass) : schemaClass;
 
-            Field[] schemaFields = schemaClass.getDeclaredFields();
+            Field[] schemaFields = Arrays.stream(schemaClass.getDeclaredFields()).filter(
+                field -> isValue(field) || isConfigValue(field) || isNamedConfigValue(field)
+            ).toArray(Field[]::new);
 
             Set<Class<?>> schemaExtensions = internalSchemaExtensions.getOrDefault(schemaClass, Set.of());
             Set<Field> extensionsFields = extensionsFields(schemaExtensions);
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
index 2ff720d..7efc849 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
@@ -57,7 +57,7 @@ import static java.util.stream.Collectors.toList;
 /** */
 public class ConfigurationUtil {
     /** Configuration source that copies values without modifying tham. */
-    static final ConfigurationSource EMPTY_CFG_SRC = new ConfigurationSource() {};
+    public static final ConfigurationSource EMPTY_CFG_SRC = new ConfigurationSource() {};
 
     /**
      * Replaces all {@code .} and {@code \} characters with {@code \.} and {@code \\} respectively.
@@ -445,19 +445,22 @@ public class ConfigurationUtil {
 
             /** {@inheritDoc} */
             @Override public Object visitInnerNode(String key, InnerNode innerNode) {
-                // Instantiate field in destination node before doing something else or copy it if it wasn't null.
-                node.construct(key, EMPTY_CFG_SRC, true);
+                InnerNode childNode = node.traverseChild(key, innerNodeVisitor(), true);
 
-                addDefaults(node.traverseChild(key, innerNodeVisitor(), true));
+                // Instantiate field in destination node before doing something else.
+                if (childNode == null) {
+                    node.construct(key, EMPTY_CFG_SRC, true);
+
+                    childNode = node.traverseChild(key, innerNodeVisitor(), true);
+                }
+
+                addDefaults(childNode);
 
                 return null;
             }
 
             /** {@inheritDoc} */
             @Override public Object visitNamedListNode(String key, NamedListNode<?> namedList) {
-                // Copy internal map.
-                node.construct(key, EMPTY_CFG_SRC, true);
-
                 namedList = node.traverseChild(key, namedListNodeVisitor(), true);
 
                 for (String namedListKey : namedList.namedListKeys()) {
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/validation/OneOfValidator.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/validation/OneOfValidator.java
new file mode 100644
index 0000000..49b3c24
--- /dev/null
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/validation/OneOfValidator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.validation;
+
+import java.util.Arrays;
+import org.apache.ignite.configuration.validation.OneOf;
+import org.apache.ignite.configuration.validation.ValidationContext;
+import org.apache.ignite.configuration.validation.ValidationIssue;
+import org.apache.ignite.configuration.validation.Validator;
+
+/**
+ * {@link Validator} implementation for the {@link OneOf} annotation.
+ */
+public class OneOfValidator implements Validator<OneOf, String> {
+    /** {@inheritDoc} */
+    @Override public void validate(OneOf annotation, ValidationContext<String> ctx) {
+        String value = ctx.getNewValue();
+
+        boolean caseSensitive = annotation.caseSensitive();
+
+        for (String exp : annotation.value()) {
+            if (caseSensitive ? exp.equals(value) : exp.equalsIgnoreCase(value))
+                return;
+        }
+
+        String message = "'" + ctx.currentKey() + "' configuration value must be one of " +
+            Arrays.toString(annotation.value()) + (caseSensitive ? " (case sensitive)" : " (case insensitive)");
+
+        ctx.addIssue(new ValidationIssue(message));
+    }
+}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
index c83bdc2..d8b17f2 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
@@ -55,6 +55,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -236,7 +237,7 @@ public class ConfigurationChangerTest {
         storage.fail(true);
 
         assertThrows(ExecutionException.class, () -> changer.change(source(KEY, (AChange parent) -> parent
-            .changeChild(child -> child.changeIntCfg(1))
+            .changeChild(child -> child.changeIntCfg(1).changeStrCfg("1"))
         )).get(1, SECONDS));
 
         storage.fail(false);
@@ -247,7 +248,8 @@ public class ConfigurationChangerTest {
         assertEquals(0, dataMap.size());
 
         AView newRoot = (AView)changer.getRootNode(KEY);
-        assertNull(newRoot.child());
+        assertNotNull(newRoot.child());
+        assertNull(newRoot.child().strCfg());
     }
 
     /** */
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/validation/OneOfValidatorTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/validation/OneOfValidatorTest.java
new file mode 100644
index 0000000..612842b
--- /dev/null
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/validation/OneOfValidatorTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.validation;
+
+import org.apache.ignite.configuration.validation.OneOf;
+import org.apache.ignite.configuration.validation.ValidationContext;
+import org.apache.ignite.configuration.validation.ValidationIssue;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/** */
+public class OneOfValidatorTest {
+    /** */
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testValidate(boolean caseSensitive) {
+        // Prepare mocked annotation instance.
+        OneOf oneOfAnnotation = mock(OneOf.class);
+
+        when(oneOfAnnotation.value()).thenReturn(new String[] {"foo", "bar"});
+        when(oneOfAnnotation.caseSensitive()).thenReturn(caseSensitive);
+
+        // Prepare mocked validation context.
+        ValidationContext<String> ctx = mock(ValidationContext.class);
+
+        when(ctx.currentKey()).thenReturn("x");
+        when(ctx.getNewValue()).thenReturn("foo", "Bar", "no");
+
+        // Prepare issues captor.
+        ArgumentCaptor<ValidationIssue> issuesCaptor = ArgumentCaptor.forClass(ValidationIssue.class);
+        doNothing().when(ctx).addIssue(issuesCaptor.capture());
+
+        // Instantiate validator.
+        OneOfValidator oneOfValidator = new OneOfValidator();
+
+        // Assert that valid value produces no issues.
+        oneOfValidator.validate(oneOfAnnotation, ctx);
+
+        assertThat(issuesCaptor.getAllValues(), is(empty()));
+
+        // Assert that case sencitivity affects validation.
+        oneOfValidator.validate(oneOfAnnotation, ctx);
+
+        if (caseSensitive)
+            assertThat(issuesCaptor.getValue().message(), is("'x' configuration value must be one of [foo, bar] (case sensitive)"));
+        else
+            assertThat(issuesCaptor.getAllValues(), is(empty()));
+
+        // Assert that unacceptable value produces validation issue.
+        oneOfValidator.validate(oneOfAnnotation, ctx);
+
+        if (caseSensitive)
+            assertThat(issuesCaptor.getValue().message(), is("'x' configuration value must be one of [foo, bar] (case sensitive)"));
+        else
+            assertThat(issuesCaptor.getValue().message(), is("'x' configuration value must be one of [foo, bar] (case insensitive)"));
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 50ca746..5b06fd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -31,11 +31,14 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 import org.apache.ignite.lang.IgniteLogger;
 import org.jetbrains.annotations.Nullable;
 
@@ -467,27 +470,36 @@ public class IgniteUtils {
      * thrown exception will be propagated to the caller, after all other objects are closed, similar to
      * the try-with-resources block.
      *
-     * @param closeables Collection of objects to close.
+     * @param closeables Stream of objects to close.
      * @throws Exception If failed to close.
      */
-    public static void closeAll(Collection<? extends AutoCloseable> closeables) throws Exception {
-        Exception ex = null;
+    public static void closeAll(Stream<? extends AutoCloseable> closeables) throws Exception {
+        AtomicReference<Exception> ex = new AtomicReference<>();
 
-        for (AutoCloseable closeable : closeables) {
+        closeables.filter(Objects::nonNull).forEach(closeable -> {
             try {
-                if (closeable != null)
-                    closeable.close();
+                closeable.close();
             }
             catch (Exception e) {
-                if (ex == null)
-                    ex = e;
-                else
-                    ex.addSuppressed(e);
+                if (!ex.compareAndSet(null, e))
+                    ex.get().addSuppressed(e);
             }
-        }
+        });
 
-        if (ex != null)
-            throw ex;
+        if (ex.get() != null)
+            throw ex.get();
+    }
+
+    /**
+     * Closes all provided objects. If any of the {@link AutoCloseable#close} methods throw an exception, only the first
+     * thrown exception will be propagated to the caller, after all other objects are closed, similar to
+     * the try-with-resources block.
+     *
+     * @param closeables Collection of objects to close.
+     * @throws Exception If failed to close.
+     */
+    public static void closeAll(Collection<? extends AutoCloseable> closeables) throws Exception {
+        closeAll(closeables.stream());
     }
 
     /**
@@ -499,7 +511,7 @@ public class IgniteUtils {
      * @see #closeAll(Collection)
      */
     public static void closeAll(AutoCloseable... closeables) throws Exception {
-        closeAll(Arrays.asList(closeables));
+        closeAll(Arrays.stream(closeables));
     }
 
     /**
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
index fcdc676..5c15ba3 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
@@ -173,6 +173,13 @@ public class ColumnFamily implements AutoCloseable {
     }
 
     /**
+     * @return Column family handle.
+     */
+    public ColumnFamilyHandle handle() {
+        return cfHandle;
+    }
+
+    /**
      * @return Name of the column family.
      */
     public String name() {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 93b24ae..f68ab1e 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
 import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
 import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -180,7 +181,8 @@ public class IgniteImpl implements Ignite {
         clusterCfgMgr = new ConfigurationManager(
             Arrays.asList(
                 ClusterConfiguration.KEY,
-                TablesConfiguration.KEY
+                TablesConfiguration.KEY,
+                DataStorageConfiguration.KEY
             ),
             Map.of(),
             new DistributedConfigurationStorage(metaStorageMgr, vaultMgr),
@@ -195,6 +197,7 @@ public class IgniteImpl implements Ignite {
 
         distributedTblMgr = new TableManager(
             clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+            clusterCfgMgr.configurationRegistry().getConfiguration(DataStorageConfiguration.KEY),
             raftMgr,
             baselineMgr,
             clusterSvc.topologyService(),
diff --git a/modules/storage-api/pom.xml b/modules/storage-api/pom.xml
index f891825..6eb9f03 100644
--- a/modules/storage-api/pom.xml
+++ b/modules/storage-api/pom.xml
@@ -35,7 +35,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-core</artifactId>
+            <artifactId>ignite-api</artifactId>
         </dependency>
 
         <!-- Test dependencies -->
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
similarity index 98%
rename from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
rename to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
index ee83006..9b9096d 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
@@ -31,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
  * Any locking is unnecessary as this storage is used within RAFT groups where all write operations are
  * serialized.
  */
-public interface Storage extends AutoCloseable {
+public interface PartitionStorage extends AutoCloseable {
     /**
      * Reads a DataRow for a given key.
      *
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/DataRegion.java
similarity index 67%
copy from modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/DataRegion.java
index eaaed7e..7abf270 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/DataRegion.java
@@ -15,17 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.basic;
+package org.apache.ignite.internal.storage.engine;
 
-import org.apache.ignite.internal.storage.AbstractStorageTest;
-import org.junit.jupiter.api.BeforeEach;
+import org.apache.ignite.internal.manager.IgniteComponent;
 
 /**
- * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ * Interface that represents Ignite data region. Data region is a memory segment of fixed size, usually located offheap,
+ * that caches user data in memory.
  */
-public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
-    @BeforeEach
-    public void setUp() {
-        storage = new ConcurrentHashMapStorage();
-    }
+public interface DataRegion extends IgniteComponent {
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
new file mode 100644
index 0000000..4163915
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+
+/**
+ * General storageengine interface.
+ */
+public interface StorageEngine {
+    /**
+     * Creates new data resion.
+     *
+     * @param regionCfg Data region configuration.
+     * @return New data region.
+     */
+    DataRegion createDataRegion(DataRegionConfiguration regionCfg);
+
+    /**
+     * Creates new table storage.
+     *
+     * @param tablePath Path to store table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparator factory for SQL indexes.
+     * @return New table storage.
+     */
+    TableStorage createTable(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        DataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    );
+}
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
similarity index 52%
copy from modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
index 534d5ec..7c557a6 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
@@ -15,24 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.configuration.notifications;
+package org.apache.ignite.internal.storage.engine;
 
-import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
 
 /**
- * Configuration property change listener.
- *
- * @param <VIEW> VIEW type configuration.
+ * Table storage that contains meta, partitions and SQL indexes.
  */
-@FunctionalInterface
-public interface ConfigurationListener<VIEW> {
+public interface TableStorage {
     /**
-     * Called on property value update.
+     * Gets or creates a partition for current table.
      *
-     * @param ctx Notification context.
-     * @return Future that signifies the end of the listener execution.
+     * @param partId Partition id.
+     * @return Partition storage.
      */
-    @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
-}
+    PartitionStorage getOrCreatePartition(int partId);
+
+    /**
+     * Starts the storage.
+     *
+     * @throws StorageException If something went wrong.
+     */
+    public void start() throws StorageException;
 
+    /**
+     * Stops the storage.
+     *
+     * @throws StorageException If something went wrong.
+     */
+    void stop() throws StorageException;
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
similarity index 96%
rename from modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
rename to modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
index 00c75df..cbbbc21 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
@@ -61,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Abstract test that covers basic scenarios of the storage API.
  */
 @ExtendWith(WorkDirectoryExtension.class)
-public abstract class AbstractStorageTest {
+public abstract class AbstractPartitionStorageTest {
     /** Test key. */
     private static final String KEY = "key";
 
@@ -69,7 +69,7 @@ public abstract class AbstractStorageTest {
     private static final String VALUE = "value";
 
     /** Storage instance. */
-    protected Storage storage;
+    protected PartitionStorage storage;
 
     /**
      * Tests that read / write / remove work consistently on the same key.
@@ -465,7 +465,7 @@ public abstract class AbstractStorageTest {
     }
 
     /**
-     * Tests the {@link Storage#readAll(List)} operation successfully reads data rows from the storage.
+     * Tests the {@link PartitionStorage#readAll(List)} operation successfully reads data rows from the storage.
      */
     @Test
     public void testReadAll() {
@@ -483,7 +483,7 @@ public abstract class AbstractStorageTest {
     }
 
     /**
-     * Tests that {@link Storage#writeAll(List)} operation successfully writes a collection of data rows into the
+     * Tests that {@link PartitionStorage#writeAll(List)} operation successfully writes a collection of data rows into the
      * storage.
      */
     @Test
@@ -497,7 +497,7 @@ public abstract class AbstractStorageTest {
     }
 
     /**
-     * Tests that {@link Storage#insertAll(List)} operation doesn't insert data rows which keys
+     * Tests that {@link PartitionStorage#insertAll(List)} operation doesn't insert data rows which keys
      * are already present in the storage. This operation must also return the list of such data rows.
      */
     @Test
@@ -517,7 +517,7 @@ public abstract class AbstractStorageTest {
     }
 
     /**
-     * Tests that {@link Storage#removeAll(List)} operation successfully retrieves and removes a collection of
+     * Tests that {@link PartitionStorage#removeAll(List)} operation successfully retrieves and removes a collection of
      * {@link SearchRow}s.
      */
     @Test
@@ -547,7 +547,7 @@ public abstract class AbstractStorageTest {
     }
 
     /**
-     * Tests that {@link Storage#removeAllExact(List)} operation successfully removes and retrieves a collection
+     * Tests that {@link PartitionStorage#removeAllExact(List)} operation successfully removes and retrieves a collection
      * of data rows with the given exact keys and values from the storage.
      */
     @Test
@@ -566,7 +566,7 @@ public abstract class AbstractStorageTest {
     }
 
     /**
-     * Tests that {@link Storage#removeAllExact(List)} operation doesn't remove and retrieve a collection
+     * Tests that {@link PartitionStorage#removeAllExact(List)} operation doesn't remove and retrieve a collection
      * of data rows with the given exact keys and values from the storage if the value in the storage doesn't match
      * the given value.
      */
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
similarity index 98%
rename from modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
rename to modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
index f76957c..b67e132 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
@@ -36,8 +36,8 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.Storage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
@@ -50,7 +50,7 @@ import static java.util.stream.Collectors.toList;
 /**
  * Storage implementation based on {@link ConcurrentHashMap}.
  */
-public class ConcurrentHashMapStorage implements Storage {
+public class ConcurrentHashMapPartitionStorage implements PartitionStorage {
     /** Name of the snapshot file. */
     private static final String SNAPSHOT_FILE = "snapshot_file";
 
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
index eaaed7e..896d4ce 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.storage.basic;
 
-import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
 import org.junit.jupiter.api.BeforeEach;
 
 /**
- * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ * Storage test implementation for {@link ConcurrentHashMapPartitionStorage}.
  */
-public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
+public class ConcurrentHashMapStorageTest extends AbstractPartitionStorageTest {
     @BeforeEach
     public void setUp() {
-        storage = new ConcurrentHashMapStorage();
+        storage = new ConcurrentHashMapPartitionStorage();
     }
 }
diff --git a/modules/storage-rocksdb/pom.xml b/modules/storage-rocksdb/pom.xml
index 1327d41..b2d5e98 100644
--- a/modules/storage-rocksdb/pom.xml
+++ b/modules/storage-rocksdb/pom.xml
@@ -57,6 +57,18 @@
         </dependency>
 
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-configuration</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
             <type>test-jar</type>
@@ -65,6 +77,13 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-configuration</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-storage-api</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
new file mode 100644
index 0000000..2f1c469
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.util.Locale;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataRegionView;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.rocksdb.Cache;
+import org.rocksdb.ClockCache;
+import org.rocksdb.LRUCache;
+import org.rocksdb.WriteBufferManager;
+
+import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_CLOCK_CACHE;
+import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
+
+/**
+ * Data region implementation for {@link RocksDbStorageEngine}. Based on a {@link Cache}.
+ */
+public class RocksDbDataRegion implements DataRegion {
+    /** Region configuration. */
+    private final DataRegionConfiguration cfg;
+
+    /** RocksDB cache instance. */
+    private Cache cache;
+
+    /** Write buffer manager instance. */
+    private WriteBufferManager writeBufferManager;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Data region configuration.
+     */
+    public RocksDbDataRegion(DataRegionConfiguration cfg) {
+        this.cfg = cfg;
+
+        assert ROCKSDB_DATA_REGION_TYPE.equalsIgnoreCase(cfg.type().value());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        DataRegionView dataRegionView = cfg.value();
+
+        long writeBufferSize = dataRegionView.writeBufferSize();
+
+        long totalCacheSize = dataRegionView.size() + writeBufferSize;
+
+        switch (dataRegionView.cache().toLowerCase(Locale.ROOT)) {
+            case ROCKSDB_CLOCK_CACHE:
+                cache = new ClockCache(totalCacheSize, dataRegionView.numShardBits(), false);
+
+                break;
+
+            case ROCKSDB_LRU_CACHE:
+                cache = new LRUCache(totalCacheSize, dataRegionView.numShardBits(), false);
+
+                break;
+
+            default:
+                assert false : dataRegionView.cache();
+        }
+
+        writeBufferManager = new WriteBufferManager(writeBufferSize, cache);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws Exception {
+        IgniteUtils.closeAll(writeBufferManager, cache);
+    }
+
+    /**
+     * @return Write buffer manager associated withthe region.
+     */
+    public WriteBufferManager writeBufferManager() {
+        return writeBufferManager;
+    }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
similarity index 81%
rename from modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
index 24a01c3..41d47ff 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.storage.rocksdb;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -26,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -38,8 +36,8 @@ import java.util.function.Predicate;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.Storage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.basic.SimpleDataRow;
 import org.apache.ignite.internal.util.Cursor;
@@ -47,15 +45,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
-import org.rocksdb.AbstractComparator;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ComparatorOptions;
-import org.rocksdb.DBOptions;
 import org.rocksdb.IngestExternalFileOptions;
-import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
@@ -63,94 +53,34 @@ import org.rocksdb.Snapshot;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
+import static java.util.Collections.nCopies;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.createSstFile;
 
 /**
  * Storage implementation based on a single RocksDB instance.
  */
-public class RocksDbStorage implements Storage {
+public class RocksDbPartitionStorage implements PartitionStorage {
     /** Suffix for the temporary snapshot folder */
     private static final String TMP_SUFFIX = ".tmp";
 
-    /** Snapshot file name. */
-    private static final String COLUMN_FAMILY_NAME = "data";
-
-    static {
-        RocksDB.loadLibrary();
-    }
-
-    /** RocksDB comparator options. */
-    private final ComparatorOptions comparatorOptions;
-
-    /** RocksDB comparator. */
-    private final AbstractComparator comparator;
-
-    /** RockDB options. */
-    private final DBOptions options;
-
     /** RocksDb instance. */
     private final RocksDB db;
 
     /** Data column family. */
     private final ColumnFamily data;
 
-    /** DB path. */
-    private final Path dbPath;
-
     /** Thread-pool for snapshot operations execution. */
     private final ExecutorService snapshotExecutor = Executors.newSingleThreadExecutor();
 
     /**
-     * @param dbPath Path to the folder to store data.
-     * @param comparator Keys comparator.
+     * @param db Rocks DB instance.
+     * @param columnFamily Column family to be used for all storage operations.
      * @throws StorageException If failed to create RocksDB instance.
      */
-    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
-        try {
-            this.dbPath = dbPath;
-
-            comparatorOptions = new ComparatorOptions();
-
-            this.comparator = new AbstractComparator(comparatorOptions) {
-                /** {@inheritDoc} */
-                @Override public String name() {
-                    return "comparator";
-                }
-
-                /** {@inheritDoc} */
-                @Override public int compare(ByteBuffer a, ByteBuffer b) {
-                    return comparator.compare(a, b);
-                }
-            };
-
-            options = new DBOptions()
-                .setCreateMissingColumnFamilies(true)
-                .setCreateIfMissing(true);
-
-            Options dataOptions = new Options().setCreateIfMissing(true).setComparator(this.comparator);
-
-            ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
+    public RocksDbPartitionStorage(RocksDB db, ColumnFamily columnFamily) throws StorageException {
+        this.db = db;
 
-            List<ColumnFamilyDescriptor> descriptors = Collections.singletonList(
-                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, dataFamilyOptions)
-            );
-
-            var handles = new ArrayList<ColumnFamilyHandle>();
-
-            db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
-
-            data = new ColumnFamily(db, handles.get(0), COLUMN_FAMILY_NAME, dataFamilyOptions, dataOptions);
-        }
-        catch (RocksDBException e) {
-            try {
-                close();
-            }
-            catch (Exception ex) {
-                e.addSuppressed(ex);
-            }
-
-            throw new StorageException("Failed to start the storage", e);
-        }
+        this.data = columnFamily;
     }
 
     /** {@inheritDoc} */
@@ -173,7 +103,7 @@ public class RocksDbStorage implements Storage {
 
         try {
             List<byte[]> keysList = getKeys(keys);
-            List<byte[]> valuesList = db.multiGetAsList(keysList);
+            List<byte[]> valuesList = db.multiGetAsList(nCopies(keys.size(), data.handle()), keysList);
 
             assert keys.size() == valuesList.size();
 
@@ -298,7 +228,7 @@ public class RocksDbStorage implements Storage {
              WriteOptions opts = new WriteOptions()) {
 
             List<byte[]> keys = getKeys(keyValues);
-            List<byte[]> values = db.multiGetAsList(keys);
+            List<byte[]> values = db.multiGetAsList(nCopies(keys.size(), data.handle()), keys);
 
             assert values.size() == keyValues.size();
 
@@ -414,7 +344,7 @@ public class RocksDbStorage implements Storage {
     /** {@inheritDoc} */
     @Override public void restoreSnapshot(Path path) {
         try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
-            Path snapshotPath = path.resolve(COLUMN_FAMILY_NAME);
+            Path snapshotPath = path.resolve(data.name());
 
             if (!Files.exists(snapshotPath))
                 throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
@@ -429,8 +359,6 @@ public class RocksDbStorage implements Storage {
     /** {@inheritDoc} */
     @Override public void close() throws Exception {
         IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS);
-
-        IgniteUtils.closeAll(data, db, options, comparator, comparatorOptions);
     }
 
     /** Cursor wrapper over the RocksIterator object with custom filter. */
@@ -504,14 +432,6 @@ public class RocksDbStorage implements Storage {
     }
 
     /**
-     * @return Path to the database.
-     */
-    @TestOnly
-    public Path getDbPath() {
-        return dbPath;
-    }
-
-    /**
      * Gets a list of key byte arrays.
      * @param keyValues Key rows.
      * @return List of keys as byte arrays.
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
new file mode 100644
index 0000000..318026f
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.rocksdb.RocksDB;
+
+/**
+ * Storage engine implementation based on RocksDB.
+ */
+public class RocksDbStorageEngine implements StorageEngine {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRegion createDataRegion(DataRegionConfiguration regionCfg) {
+        return new RocksDbDataRegion(regionCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public TableStorage createTable(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        DataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        assert dataRegion instanceof RocksDbDataRegion : dataRegion;
+
+        return new RocksDbTableStorage(
+            tablePath,
+            tableCfg,
+            (RocksDbDataRegion)dataRegion,
+            indexComparatorFactory
+        );
+    }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
new file mode 100644
index 0000000..5f4fe6f
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.BiFunction;
+import java.util.stream.IntStream;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+    /**
+     * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+     * created.
+     */
+    private static final String CF_META = "default";
+
+    /** Prefix for partitions column families names. */
+    private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+    /** Prefix for SQL indexes column family names. */
+    private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+    /** Name of comparator used in indexes column family. */
+    private static final String INDEX_COMPARATOR_NAME = "index-comparator";
+
+    /** Path for the directory that stores table data. */
+    private final Path tablePath;
+
+    /** Table configuration. */
+    private final TableConfiguration tableCfg;
+
+    /** Data region for the table. */
+    private final RocksDbDataRegion dataRegion;
+
+    /** Comparators factory for indexes. */
+    private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+    /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+    private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+    /** Rocks DB instance itself. */
+    private RocksDB db;
+
+    /** CF handle for meta information. */
+    @SuppressWarnings("unused")
+    private ColumnFamilyHandle metaCfHandle;
+
+    /** Column families for partitions. Stored as an array for the quick access by an index. */
+    private AtomicReferenceArray<ColumnFamily> partitionCfs;
+
+    /** Max number of partitions in the table. */
+    private int partitions;
+
+    /** Column families for indexes by their names. */
+    private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+    /** Utility enum to describe a type of the column family - meta, partition or index. */
+    private enum ColumnFamilyType {
+        META, PARTITION, INDEX
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param tablePath Path for the directory that stores table data.
+     * @param tableCfg Table configuration.
+     * @param dataRegion Data region for the table.
+     * @param indexComparatorFactory Comparators factory for indexes.
+     */
+    public RocksDbTableStorage(
+        Path tablePath,
+        TableConfiguration tableCfg,
+        RocksDbDataRegion dataRegion,
+        BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+    ) {
+        this.tablePath = tablePath;
+        this.tableCfg = tableCfg;
+        this.dataRegion = dataRegion;
+        this.indexComparatorFactory = indexComparatorFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws StorageException {
+        Map<ColumnFamilyType, List<String>> cfNamesGrouped = getColumnFamiliesNames();
+
+        List<ColumnFamilyDescriptor> cfDescriptors = convertToColumnFamiliesDescriptors(cfNamesGrouped);
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        DBOptions dbOptions = addToCloseableResources(new DBOptions()
+            .setCreateIfMissing(true)
+            .setWriteBufferManager(dataRegion.writeBufferManager())
+        );
+
+        try {
+            db = addToCloseableResources(RocksDB.open(dbOptions, tablePath.toAbsolutePath().toString(), cfDescriptors, cfHandles));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to initialize RocksDB instance.", e);
+        }
+
+        partitions = tableCfg.value().partitions();
+
+        partitionCfs = new AtomicReferenceArray<>(partitions);
+
+        for (int cfListIndex = 0; cfListIndex < cfHandles.size(); cfListIndex++) {
+            ColumnFamilyHandle cfHandle = cfHandles.get(cfListIndex);
+
+            String handleName;
+            try {
+                handleName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
+            }
+            catch (RocksDBException e) {
+                throw new StorageException("Failed to read RocksDB column family name.", e);
+            }
+
+            if (handleName.equals(CF_META))
+                this.metaCfHandle = addToCloseableResources(cfHandle);
+            else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
+                int partId = partitionId(handleName);
+
+                ColumnFamilyDescriptor cfDescriptor = cfDescriptors.get(cfListIndex);
+
+                partitionCfs.set(partId, new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null));
+            }
+            else {
+                String indexName = handleName.substring(CF_INDEX_PREFIX.length());
+
+                indicesCfHandles.put(indexName, cfHandle);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws StorageException {
+        try {
+            List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
+
+            Collections.reverse(copy);
+
+            IgniteUtils.closeAll(concat(
+                concat(IntStream.range(0, partitions).mapToObj(partitionCfs::get), indicesCfHandles.values().stream()),
+                copy.stream()
+            ));
+        }
+        catch (Exception e) {
+            throw new StorageException("Failed to stop RocksDB table storage.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public PartitionStorage getOrCreatePartition(int partId) {
+        assert partId < partitions : S.toString(
+            "Attempt to create partition with id outside of configured range",
+            "partitionId", partId, false,
+            "partitions", partitions, false
+        );
+
+        ColumnFamily partitionCf = partitionCfs.get(partId);
+
+        if (partitionCf == null) {
+            String handleName = partitionColumnFamilyName(partId);
+
+            ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
+                handleName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            );
+
+            try {
+                ColumnFamilyHandle cfHandle = db.createColumnFamily(cfDescriptor);
+
+                partitionCf = new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null);
+            }
+            catch (RocksDBException e) {
+                cfDescriptor.getOptions().close();
+
+                throw new StorageException("Failed to create new RocksDB column family " + handleName, e);
+            }
+
+            partitionCfs.set(partId, partitionCf);
+        }
+
+        return new RocksDbPartitionStorage(db, partitionCf);
+    }
+
+    /**
+     * Returns list of column families names that belong to RocksDB instance in the given path, grouped by thier
+     * {@link ColumnFamilyType}.
+     *
+     * @return Map with column families names.
+     * @throws StorageException If something went wrong.
+     */
+    private Map<ColumnFamilyType, List<String>> getColumnFamiliesNames() {
+        String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+        List<String> cfNames = new ArrayList<>();
+
+        try (Options opts = new Options()) {
+            List<byte[]> cfNamesBytes = RocksDB.listColumnFamilies(opts, absolutePathStr);
+
+            for (byte[] cfNameBytes : cfNamesBytes)
+                cfNames.add(new String(cfNameBytes, StandardCharsets.UTF_8));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException(
+                "Failed to read list of column families names for the RocksDB instance located at path " + absolutePathStr,
+                e
+            );
+        }
+
+        return cfNames.stream().collect(groupingBy(this::columnFamilyType));
+    }
+
+    /**
+     * Returns list of CF descriptors by their names.
+     *
+     * @param cfGrouped Map from CF type to lists of names.
+     * @return List of CF descriptors.
+     */
+    @NotNull private List<ColumnFamilyDescriptor> convertToColumnFamiliesDescriptors(
+        Map<ColumnFamilyType,
+            List<String>> cfGrouped
+    ) {
+        List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+        Options cfOptions = addToCloseableResources(new Options().setCreateIfMissing(true));
+
+        cfDescriptors.add(new ColumnFamilyDescriptor(
+            CF_META.getBytes(StandardCharsets.UTF_8),
+            addToCloseableResources(new ColumnFamilyOptions(cfOptions))
+        ));
+
+        for (String partitionCfName : cfGrouped.getOrDefault(ColumnFamilyType.PARTITION, List.of())) {
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                partitionCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+            ));
+        }
+
+        NamedListView<? extends TableIndexView> indicesCfgView = tableCfg.value().indices();
+
+        for (String indexCfName : cfGrouped.getOrDefault(ColumnFamilyType.INDEX, List.of())) {
+            String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+            TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+            assert indexCfgView != null : "Found index that is absent in configuration: " + indexCfName;
+
+            Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfg.value(), indexName);
+
+            cfDescriptors.add(new ColumnFamilyDescriptor(
+                indexCfName.getBytes(StandardCharsets.UTF_8),
+                new ColumnFamilyOptions()
+                    .setComparator(addToCloseableResources(
+                        new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+                            /** {@inheritDoc} */
+                            @Override public String name() {
+                                return INDEX_COMPARATOR_NAME;
+                            }
+
+                            /** {@inheritDoc} */
+                            @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                                return indexComparator.compare(a, b);
+                            }
+                        }))
+            ));
+        }
+
+        return cfDescriptors;
+    }
+
+    /**
+     * Creates column family name by partition id.
+     *
+     * @param partId Partition id.
+     * @return Column family name.
+     */
+    private static String partitionColumnFamilyName(int partId) {
+        return CF_PARTITION_PREFIX + partId;
+    }
+
+    /**
+     * Gets partition id from column family name.
+     *
+     * @param cfName Column family name.
+     * @return Partition id.
+     */
+    private static int partitionId(String cfName) {
+        return parseInt(cfName.substring(CF_PARTITION_PREFIX.length()));
+    }
+
+    /**
+     * Creates column family name by index name.
+     *
+     * @param idxName Index name.
+     * @return Column family name.
+     */
+    private static String indexColumnFamilyName(String idxName) {
+        return CF_INDEX_PREFIX + idxName;
+    }
+
+    /**
+     * Determines column family type by its name.
+     *
+     * @param cfName Column family name.
+     * @return Column family type.
+     * @throws StorageException If column family name doesn't match any known pattern.
+     */
+    private ColumnFamilyType columnFamilyType(String cfName) throws StorageException {
+        if (CF_META.equals(cfName))
+            return ColumnFamilyType.META;
+
+        if (cfName.startsWith(CF_PARTITION_PREFIX))
+            return ColumnFamilyType.PARTITION;
+
+        if (cfName.startsWith(CF_INDEX_PREFIX))
+            return ColumnFamilyType.INDEX;
+
+        throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.name() + ']');
+    }
+
+    /**
+     * Adds resource to the {@link #autoCloseables} list.
+     *
+     * @param autoCloseable Closeable resource.
+     * @param <R> Type of the resource.
+     * @return Passed resource with the same type.
+     */
+    private <R extends AutoCloseable> R addToCloseableResources(R autoCloseable) {
+        autoCloseables.add(autoCloseable);
+
+        return autoCloseable;
+    }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
index 2a1e7cb..1559bba 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
@@ -17,9 +17,14 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -27,20 +32,56 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 /**
- * Storage test implementation for {@link RocksDbStorage}.
+ * Storage test implementation for {@link RocksDbPartitionStorage}.
  */
 @ExtendWith(WorkDirectoryExtension.class)
-public class RocksDbStorageTest extends AbstractStorageTest {
+@ExtendWith(ConfigurationExtension.class)
+public class RocksDbStorageTest extends AbstractPartitionStorageTest {
+    /** */
+    private TableStorage table;
+
+    /** */
+    private DataRegion dataRegion;
+
     /** */
     @BeforeEach
-    public void setUp(@WorkDirectory Path workDir) {
-        storage = new RocksDbStorage(workDir, ByteBuffer::compareTo);
+    public void setUp(
+        @WorkDirectory Path workDir,
+        @InjectConfiguration DataRegionConfiguration dataRegionCfg,
+        @InjectConfiguration TableConfiguration tableCfg
+    ) throws Exception {
+        dataRegionCfg.change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)).get();
+
+        RocksDbStorageEngine engine = new RocksDbStorageEngine();
+
+        dataRegion = engine.createDataRegion(dataRegionCfg);
+
+        assertThat(dataRegion, is(instanceOf(RocksDbDataRegion.class)));
+
+        dataRegion.start();
+
+        table = engine.createTable(workDir, tableCfg, dataRegion, (tableView, indexName) -> null);
+
+        assertThat(table, is(instanceOf(RocksDbTableStorage.class)));
+
+        table.start();
+
+        storage = table.getOrCreatePartition(0);
+
+        assertThat(storage, is(instanceOf(RocksDbPartitionStorage.class)));
     }
 
     /** */
     @AfterEach
     public void tearDown() throws Exception {
-        IgniteUtils.closeAll(storage);
+        IgniteUtils.closeAll(
+            table == null ? null : table::stop,
+            dataRegion == null ? null : dataRegion::stop
+        );
     }
 }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index ddd7e8c..56cf6ae 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.storage.basic.ConcurrentHashMapStorage;
+import org.apache.ignite.internal.storage.basic.ConcurrentHashMapPartitionStorage;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertCommand;
@@ -189,7 +189,7 @@ public class ITDistributedTableTest {
 
         partSrv.startRaftGroup(
             grpId,
-            new PartitionListener(new ConcurrentHashMapStorage()),
+            new PartitionListener(new ConcurrentHashMapPartitionStorage()),
             conf
         );
 
@@ -280,7 +280,7 @@ public class ITDistributedTableTest {
 
             rs.startRaftGroup(
                 grpId,
-                new PartitionListener(new ConcurrentHashMapStorage()),
+                new PartitionListener(new ConcurrentHashMapPartitionStorage()),
                 conf
             );
 
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
index 17d71c2..384ac2c 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.basic.SimpleDataRow;
 import org.apache.ignite.internal.table.InternalTable;
@@ -102,7 +102,7 @@ public class ITInternalTableScanTest {
 
     /** Mock partition storage. */
     @Mock
-    private Storage mockStorage;
+    private PartitionStorage mockStorage;
 
     /** */
     private ClusterService network;
@@ -150,7 +150,7 @@ public class ITInternalTableScanTest {
 
         List<Peer> conf = List.of(new Peer(nodeNetworkAddress));
 
-        mockStorage = mock(Storage.class);
+        mockStorage = mock(PartitionStorage.class);
 
         raftSrv.startRaftGroup(
             grpName,
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
index 28b3a4d..4090f70 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
@@ -32,8 +32,8 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.Storage;
-import org.apache.ignite.internal.storage.basic.ConcurrentHashMapStorage;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.basic.ConcurrentHashMapPartitionStorage;
 import org.apache.ignite.internal.storage.basic.SimpleDataRow;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -114,7 +114,7 @@ public class ITTablePersistenceTest extends ITAbstractListenerSnapshotTest<Parti
 
     /** {@inheritDoc} */
     @Override public BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot) {
-        Storage storage = getListener(restarted, raftGroupId()).getStorage();
+        PartitionStorage storage = getListener(restarted, raftGroupId()).getStorage();
 
         Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
         Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
@@ -144,7 +144,7 @@ public class ITTablePersistenceTest extends ITAbstractListenerSnapshotTest<Parti
             .map(Map.Entry::getKey)
             .findAny()
             .orElseGet(() -> {
-                PartitionListener listener = new PartitionListener(new ConcurrentHashMapStorage());
+                PartitionListener listener = new PartitionListener(new ConcurrentHashMapPartitionStorage());
 
                 paths.put(listener, workDir);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 26ebde4..73d4bdc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -34,10 +33,9 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.stream.IntStream;
-
 import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
@@ -59,7 +57,10 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
-import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -107,6 +108,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Tables configuration. */
     private final TablesConfiguration tablesCfg;
 
+    /** Data storage configuration. */
+    private final DataStorageConfiguration dataStorageCfg;
+
     /** Raft manager. */
     private final Loza raftMgr;
 
@@ -118,6 +122,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Meta storage service. */
     private final MetaStorageManager metaStorageMgr;
 
+    /** Storage engine instance. Only one type is available right now, which is the {@link RocksDbStorageEngine}. */
+    private final StorageEngine engine;
+
     /** Partitions store directory. */
     private final Path partitionsStoreDir;
 
@@ -130,10 +137,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Resolver that resolves a network address to node id. */
     private final Function<NetworkAddress, String> netAddrResolver;
 
+    /** Default data region instance. */
+    private DataRegion defaultDataRegion;
+
+    //TODO: IGNITE-15161 These should go into TableImpl instances.
+    /** Instances of table storages that need to be stopped on component stop. */
+    private final Set<TableStorage> tableStorages = ConcurrentHashMap.newKeySet();
+
     /**
      * Creates a new table manager.
      *
      * @param tablesCfg Tables configuration.
+     * @param dataStorageCfg Data storage configuration.
      * @param raftMgr Raft manager.
      * @param baselineMgr Baseline manager.
      * @param metaStorageMgr Meta storage manager.
@@ -141,6 +156,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      */
     public TableManager(
         TablesConfiguration tablesCfg,
+        DataStorageConfiguration dataStorageCfg,
         Loza raftMgr,
         BaselineManager baselineMgr,
         TopologyService topologyService,
@@ -148,6 +164,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         Path partitionsStoreDir
     ) {
         this.tablesCfg = tablesCfg;
+        this.dataStorageCfg = dataStorageCfg;
         this.raftMgr = raftMgr;
         this.baselineMgr = baselineMgr;
         this.metaStorageMgr = metaStorageMgr;
@@ -161,6 +178,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
             return node.id();
         };
+
+        engine = new RocksDbStorageEngine();
     }
 
     /** {@inheritDoc} */
@@ -248,10 +267,30 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 return CompletableFuture.completedFuture(null);
             }
         });
+
+        this.defaultDataRegion = engine.createDataRegion(dataStorageCfg.defaultRegion());
+
+        defaultDataRegion.start();
     }
 
     /** {@inheritDoc} */
     @Override public void stop() {
+        for (TableStorage tableStorage : tableStorages) {
+            try {
+                tableStorage.stop();
+            }
+            catch (Exception e) {
+                LOG.error("Failed to stop table storage " + tableStorage, e);
+            }
+        }
+
+        try {
+            if (defaultDataRegion != null)
+                defaultDataRegion.stop();
+        }
+        catch (Exception e) {
+            LOG.error("Failed to stop data region " + defaultDataRegion, e);
+        }
         // TODO: IGNITE-15161 Implement component's stop.
     }
 
@@ -272,34 +311,42 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         var partitionsGroupsFutures = new ArrayList<CompletableFuture<RaftGroupService>>();
 
-        IntStream.range(0, partitions).forEach(p ->
+        Path storageDir = partitionsStoreDir.resolve(name);
+
+        try {
+            Files.createDirectories(storageDir);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(
+                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+                e
+            );
+        }
+
+        TableStorage tableStorage = engine.createTable(
+            storageDir,
+            tablesCfg.tables().get(name),
+            defaultDataRegion,
+            (tableCfgView, indexName) -> {
+                throw new UnsupportedOperationException("Not implemented yet.");
+            }
+        );
+
+        tableStorage.start();
+
+        tableStorages.add(tableStorage);
+
+        for (int p = 0; p < partitions; p++) {
+            int partId = p;
+
             partitionsGroupsFutures.add(
                 raftMgr.prepareRaftGroup(
                     raftGroupName(tblId, p),
                     assignment.get(p),
-                    () -> {
-                        Path storageDir = partitionsStoreDir.resolve(name);
-
-                        try {
-                            Files.createDirectories(storageDir);
-                        }
-                        catch (IOException e) {
-                            throw new IgniteInternalException(
-                                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
-                                e
-                            );
-                        }
-
-                        return new PartitionListener(
-                            new RocksDbStorage(
-                                storageDir.resolve(String.valueOf(p)),
-                                ByteBuffer::compareTo
-                            )
-                        );
-                    }
+                    () -> new PartitionListener(tableStorage.getOrCreatePartition(partId))
                 )
-            )
-        );
+            );
+        }
 
         CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
             try {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 077183d..04214b4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -28,12 +28,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.Storage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
 import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
@@ -78,7 +77,7 @@ import org.jetbrains.annotations.TestOnly;
  */
 public class PartitionListener implements RaftGroupListener {
     /** Partition storage. */
-    private final Storage storage;
+    private final PartitionStorage storage;
 
     /** Cursors map. */
     private final Map<IgniteUuid, CursorMeta> cursors;
@@ -86,10 +85,10 @@ public class PartitionListener implements RaftGroupListener {
     /**
      * Constructor.
      *
-     * @param storage Storage.
+     * @param partitionStorage Storage.
      */
-    public PartitionListener(Storage storage) {
-        this.storage = storage;
+    public PartitionListener(PartitionStorage partitionStorage) {
+        this.storage = partitionStorage;
         this.cursors = new ConcurrentHashMap<>();
     }
 
@@ -591,7 +590,7 @@ public class PartitionListener implements RaftGroupListener {
      * @return Underlying storage.
      */
     @TestOnly
-    public Storage getStorage() {
+    public PartitionStorage getStorage() {
         return storage;
     }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index cf13516..3e36f64 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
 import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.baseline.BaselineManager;
@@ -156,22 +157,19 @@ public class TableManagerTest {
             );
 
             clusterCfgMgr = new ConfigurationManager(
-                List.of(ClusterConfiguration.KEY, TablesConfiguration.KEY),
+                List.of(ClusterConfiguration.KEY, TablesConfiguration.KEY, DataStorageConfiguration.KEY),
                 Map.of(),
                 new TestConfigurationStorage(DISTRIBUTED),
                 Collections.singletonList(ExtendedTableConfigurationSchema.class)
             );
 
             nodeCfgMgr.start();
+
+            nodeCfgMgr.bootstrap("node.metastorageNodes = [" + NODE_NAME + "]");
+
             clusterCfgMgr.start();
 
-            nodeCfgMgr.bootstrap("{\n" +
-                "   \"node\":{\n" +
-                "      \"metastorageNodes\":[\n" +
-                "         \"" + NODE_NAME + "\"\n" +
-                "      ]\n" +
-                "   }\n" +
-                "}");
+            clusterCfgMgr.configurationRegistry().initializeDefaults();
         }
         catch (Exception e) {
             LOG.error("Failed to bootstrap the test configuration manager.", e);
@@ -195,6 +193,7 @@ public class TableManagerTest {
     public void testStaticTableConfigured() {
         TableManager tableManager = new TableManager(
             clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+            clusterCfgMgr.configurationRegistry().getConfiguration(DataStorageConfiguration.KEY),
             rm,
             bm,
             ts,
@@ -365,6 +364,7 @@ public class TableManagerTest {
 
         TableManager tableManager = new TableManager(
             clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+            clusterCfgMgr.configurationRegistry().getConfiguration(DataStorageConfiguration.KEY),
             rm,
             bm,
             ts,
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 76d4945..396c298 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.storage.basic.ConcurrentHashMapStorage;
+import org.apache.ignite.internal.storage.basic.ConcurrentHashMapPartitionStorage;
 import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
@@ -84,7 +84,7 @@ public class PartitionCommandListenerTest {
      */
     @BeforeEach
     public void before() {
-        commandListener = new PartitionListener(new ConcurrentHashMapStorage());
+        commandListener = new PartitionListener(new ConcurrentHashMapPartitionStorage());
     }
 
     /**
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
index 0de6d2b..df2d958 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
@@ -84,7 +84,7 @@ public class PersistentVaultService implements VaultService {
             .setCompressionType(CompressionType.LZ4_COMPRESSION)
             .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
             .setLevelCompactionDynamicLevelBytes(true)
-            .setBytesPerSync(1048576)
+            .setBytesPerSync(1024 * 1024)
             .setCompactionPriority(CompactionPriority.MinOverlappingRatio)
             .setTableFormatConfig(
                 new BlockBasedTableConfig()