You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/10/07 14:22:59 UTC
[ignite-3] branch ignite-3.0.0-alpha3 updated: IGNITE-15351
Implemented concepts of storage engines and data regions with basic
integration into existing code. (#365)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch ignite-3.0.0-alpha3
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-alpha3 by this push:
new 33e6ed1 IGNITE-15351 Implemented concepts of storage engines and data regions with basic integration into existing code. (#365)
33e6ed1 is described below
commit 33e6ed1ca023999cabcd8c56e9f94c1d911dea65
Author: ibessonov <be...@gmail.com>
AuthorDate: Thu Oct 7 16:35:40 2021 +0300
IGNITE-15351 Implemented concepts of storage engines and data regions with basic integration into existing code. (#365)
Signed-off-by: ibessonov <be...@gmail.com>
---
.../store/DataRegionConfigurationSchema.java | 64 ++++
.../store/DataStorageConfigurationSchema.java} | 31 +-
.../org/apache/ignite/cli/ITConfigCommandTest.java | 18 +-
.../notifications/ConfigurationListener.java | 1 -
.../OneOf.java} | 31 +-
.../configuration/ConfigurationChanger.java | 10 +-
.../configuration/ConfigurationManager.java | 3 +-
.../configuration/ConfigurationRegistry.java | 9 +-
.../internal/configuration/DynamicProperty.java | 6 +
.../asm/ConfigurationAsmGenerator.java | 5 +-
.../configuration/util/ConfigurationUtil.java | 17 +-
.../configuration/validation/OneOfValidator.java | 46 +++
.../configuration/ConfigurationChangerTest.java | 6 +-
.../validation/OneOfValidatorTest.java | 80 +++++
.../apache/ignite/internal/util/IgniteUtils.java | 40 ++-
.../ignite/internal/rocksdb/ColumnFamily.java | 7 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 5 +-
modules/storage-api/pom.xml | 2 +-
.../{Storage.java => PartitionStorage.java} | 2 +-
.../internal/storage/engine/DataRegion.java} | 14 +-
.../internal/storage/engine/StorageEngine.java | 55 +++
.../internal/storage/engine/TableStorage.java} | 36 +-
...Test.java => AbstractPartitionStorageTest.java} | 16 +-
...java => ConcurrentHashMapPartitionStorage.java} | 4 +-
.../basic/ConcurrentHashMapStorageTest.java | 8 +-
modules/storage-rocksdb/pom.xml | 19 +
.../storage/rocksdb/RocksDbDataRegion.java | 95 +++++
...DbStorage.java => RocksDbPartitionStorage.java} | 102 +-----
.../storage/rocksdb/RocksDbStorageEngine.java | 61 ++++
.../storage/rocksdb/RocksDbTableStorage.java | 384 +++++++++++++++++++++
.../storage/rocksdb/RocksDbStorageTest.java | 55 ++-
.../ignite/distributed/ITDistributedTableTest.java | 6 +-
.../distributed/ITInternalTableScanTest.java | 6 +-
.../ignite/distributed/ITTablePersistenceTest.java | 8 +-
.../internal/table/distributed/TableManager.java | 101 ++++--
.../table/distributed/raft/PartitionListener.java | 13 +-
.../ignite/internal/table/TableManagerTest.java | 16 +-
.../raft/PartitionCommandListenerTest.java | 4 +-
.../vault/persistence/PersistentVaultService.java | 2 +-
39 files changed, 1127 insertions(+), 261 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
new file mode 100644
index 0000000..f32381f
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Immutable;
+import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Configuration schema for data region. Currently it represents configuration for rocksdb storage engine only.
+ */
+@Config
+public class DataRegionConfigurationSchema {
+ /** Type of the RocksDB data region. */
+ public static final String ROCKSDB_DATA_REGION_TYPE = "rocksdb";
+
+ /** Cache type for the RocksDB LRU cache. */
+ public static final String ROCKSDB_LRU_CACHE = "lru";
+
+ /** Cache type for the RocksDB LRU cache. */
+ public static final String ROCKSDB_CLOCK_CACHE = "clock";
+
+ /** Type for the future polymorphic configuration schemas. */
+ @Immutable
+ @OneOf(ROCKSDB_DATA_REGION_TYPE)
+ @Value(hasDefault = true)
+ public String type = ROCKSDB_DATA_REGION_TYPE;
+
+ /** Size of the rocksdb offheap cache. */
+ @Value(hasDefault = true)
+ public long size = 256 * 1024 * 1024;
+
+ /** Size of rocksdb write buffer. */
+ @Value(hasDefault = true)
+ @Min(1)
+ public long writeBufferSize = 64 * 1024 * 1024;
+
+ /** Cache type - only {@code LRU} is supported at the moment. {@code Clock} implementation has known bugs. */
+ @OneOf({ROCKSDB_LRU_CACHE})
+ @Value(hasDefault = true)
+ public String cache = ROCKSDB_LRU_CACHE;
+
+ /** The cache is sharded to 2^numShardBits shards, by hash of the key. */
+ @Min(-1)
+ @Value(hasDefault = true)
+ public int numShardBits = -1;
+}
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
similarity index 54%
copy from modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
copy to modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
index 534d5ec..0434d7b 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
@@ -15,24 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.notifications;
+package org.apache.ignite.configuration.schemas.store;
-import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.configuration.annotation.ConfigValue;
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.NamedConfigValue;
/**
- * Configuration property change listener.
- *
- * @param <VIEW> VIEW type configuration.
+ * Root configuration for data storages.
*/
-@FunctionalInterface
-public interface ConfigurationListener<VIEW> {
- /**
- * Called on property value update.
- *
- * @param ctx Notification context.
- * @return Future that signifies the end of the listener execution.
- */
- @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
-}
+@ConfigurationRoot(rootName = "db", type = ConfigurationType.DISTRIBUTED)
+public class DataStorageConfigurationSchema {
+ /** Default data region. */
+ @ConfigValue
+ public DataRegionConfigurationSchema defaultRegion;
+ /** Other data regions. */
+ @NamedConfigValue
+ public DataRegionConfigurationSchema regions;
+}
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java
index a8e7a18..571a777 100644
--- a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ITConfigCommandTest.java
@@ -35,6 +35,7 @@ import picocli.CommandLine;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test for {@code ignite config} commands.
@@ -117,13 +118,16 @@ public class ITConfigCommandTest extends AbstractCliTest {
);
assertEquals(0, exitCode);
- assertEquals(
- "\"{\"clientConnector\":{\"connectTimeout\":5000,\"port\":" + clientPort + ",\"portRange\":0}," +
- "\"network\":{\"netClusterNodes\":[],\"port\":" + networkPort + "}," +
- "\"node\":{\"metastorageNodes\":[\"localhost1\"]}," +
- "\"rest\":{\"port\":" + restPort + ",\"portRange\":0}}\"" + nl,
- unescapeQuotes(out.toString())
- );
+
+ String unescapedOut = unescapeQuotes(out.toString());
+
+ assertTrue(unescapedOut.contains(
+ "\"clientConnector\":{\"connectTimeout\":5000,\"port\":" + clientPort + ",\"portRange\":0}"
+ ), unescapedOut);
+
+ assertTrue(unescapedOut.contains(
+ "\"rest\":{\"port\":" + restPort + ",\"portRange\":0}}\""
+ ), unescapedOut);
}
@Test
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
index 534d5ec..36de3c8 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
@@ -35,4 +35,3 @@ public interface ConfigurationListener<VIEW> {
*/
@NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
}
-
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/validation/OneOf.java
similarity index 58%
copy from modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
copy to modules/configuration-api/src/main/java/org/apache/ignite/configuration/validation/OneOf.java
index 534d5ec..b6c2f8f 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/validation/OneOf.java
@@ -15,24 +15,27 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.notifications;
+package org.apache.ignite.configuration.validation;
-import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.NotNull;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
- * Configuration property change listener.
- *
- * @param <VIEW> VIEW type configuration.
+ * Signifies that current {@code String} configuration values can only be equal to one of the listed values.
*/
-@FunctionalInterface
-public interface ConfigurationListener<VIEW> {
+@Target(FIELD)
+@Retention(RUNTIME)
+public @interface OneOf {
/**
- * Called on property value update.
- *
- * @param ctx Notification context.
- * @return Future that signifies the end of the listener execution.
+ * @return List of possible values.
*/
- @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
-}
+ String[] value();
+ /**
+ * @return {@code true} if list is case sensitive.
+ */
+ boolean caseSensitive() default false;
+}
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 154d8d0..2172243 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -205,6 +205,8 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
superRoot.addRoot(rootKey, rootNode);
}
+ ConfigurationUtil.addDefaults(superRoot);
+
storageRoots = new StorageRoots(superRoot, data.changeId());
storage.registerConfigurationListener(this::updateFromListener);
@@ -312,13 +314,15 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
Map<String, ? extends Serializable> storageData = storage.readAllLatest(ConfigurationUtil.join(storagePath));
- if (storageData.isEmpty())
- throw new NoSuchElementException(ConfigurationUtil.join(path));
-
InnerNode rootNode = new SuperRoot(rootCreator());
fillFromPrefixMap(rootNode, toPrefixMap(storageData));
+ if (storageData.isEmpty())
+ rootNode.construct(path.get(0), ConfigurationUtil.EMPTY_CFG_SRC, true);
+
+ addDefaults(rootNode);
+
try {
T result = ConfigurationUtil.find(path, rootNode, true);
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java
index 170594d..0a54d78 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.configuration.hocon.HoconConverter;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.intellij.lang.annotations.Language;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.checkConfigurationType;
@@ -82,7 +83,7 @@ public class ConfigurationManager implements IgniteComponent {
* @throws InterruptedException If thread is interrupted during bootstrap.
* @throws ExecutionException If configuration update failed for some reason.
*/
- public void bootstrap(String hoconStr) throws InterruptedException, ExecutionException {
+ public void bootstrap(@Language("HOCON") String hoconStr) throws InterruptedException, ExecutionException {
ConfigObject hoconCfg = ConfigFactory.parseString(hoconStr).root();
registry.change(HoconConverter.hoconSource(hoconCfg)).get();
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
index 09223bf..fd03942 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
@@ -36,6 +36,7 @@ import org.apache.ignite.configuration.annotation.InternalConfiguration;
import org.apache.ignite.configuration.validation.Immutable;
import org.apache.ignite.configuration.validation.Max;
import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.configuration.util.KeyNotFoundException;
import org.apache.ignite.internal.configuration.validation.ImmutableValidator;
import org.apache.ignite.internal.configuration.validation.MaxValidator;
import org.apache.ignite.internal.configuration.validation.MinValidator;
+import org.apache.ignite.internal.configuration.validation.OneOfValidator;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.lang.IgniteLogger;
@@ -114,9 +116,10 @@ public class ConfigurationRegistry implements IgniteComponent {
Map<Class<? extends Annotation>, Set<Validator<?, ?>>> validators0 = new HashMap<>(validators);
- validators0.computeIfAbsent(Min.class, a -> new HashSet<>()).add(new MinValidator());
- validators0.computeIfAbsent(Max.class, a -> new HashSet<>()).add(new MaxValidator());
- validators0.computeIfAbsent(Immutable.class, a -> new HashSet<>()).add(new ImmutableValidator());
+ validators0.computeIfAbsent(Min.class, a -> new HashSet<>(1)).add(new MinValidator());
+ validators0.computeIfAbsent(Max.class, a -> new HashSet<>(1)).add(new MaxValidator());
+ validators0.computeIfAbsent(Immutable.class, a -> new HashSet<>(1)).add(new ImmutableValidator());
+ validators0.computeIfAbsent(OneOf.class, a -> new HashSet<>(1)).add(new OneOfValidator());
changer = new ConfigurationChanger(this::notificator, rootKeys, validators0, storage) {
/** {@inheritDoc} */
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java
index feb5de9..925cd09 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicProperty.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
+import org.apache.ignite.internal.tostring.S;
/**
* Holder for property value.
@@ -100,4 +101,9 @@ public class DynamicProperty<T extends Serializable> extends ConfigurationNode<T
@Override public String key() {
return key;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DynamicProperty.class, this, "key", key, "value", value());
+ }
}
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
index ee9cdde..03edebe 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -308,7 +309,9 @@ public class ConfigurationAsmGenerator {
assert schemasInfo.containsKey(schemaClass) : schemaClass;
- Field[] schemaFields = schemaClass.getDeclaredFields();
+ Field[] schemaFields = Arrays.stream(schemaClass.getDeclaredFields()).filter(
+ field -> isValue(field) || isConfigValue(field) || isNamedConfigValue(field)
+ ).toArray(Field[]::new);
Set<Class<?>> schemaExtensions = internalSchemaExtensions.getOrDefault(schemaClass, Set.of());
Set<Field> extensionsFields = extensionsFields(schemaExtensions);
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
index 2ff720d..7efc849 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
@@ -57,7 +57,7 @@ import static java.util.stream.Collectors.toList;
/** */
public class ConfigurationUtil {
/** Configuration source that copies values without modifying tham. */
- static final ConfigurationSource EMPTY_CFG_SRC = new ConfigurationSource() {};
+ public static final ConfigurationSource EMPTY_CFG_SRC = new ConfigurationSource() {};
/**
* Replaces all {@code .} and {@code \} characters with {@code \.} and {@code \\} respectively.
@@ -445,19 +445,22 @@ public class ConfigurationUtil {
/** {@inheritDoc} */
@Override public Object visitInnerNode(String key, InnerNode innerNode) {
- // Instantiate field in destination node before doing something else or copy it if it wasn't null.
- node.construct(key, EMPTY_CFG_SRC, true);
+ InnerNode childNode = node.traverseChild(key, innerNodeVisitor(), true);
- addDefaults(node.traverseChild(key, innerNodeVisitor(), true));
+ // Instantiate field in destination node before doing something else.
+ if (childNode == null) {
+ node.construct(key, EMPTY_CFG_SRC, true);
+
+ childNode = node.traverseChild(key, innerNodeVisitor(), true);
+ }
+
+ addDefaults(childNode);
return null;
}
/** {@inheritDoc} */
@Override public Object visitNamedListNode(String key, NamedListNode<?> namedList) {
- // Copy internal map.
- node.construct(key, EMPTY_CFG_SRC, true);
-
namedList = node.traverseChild(key, namedListNodeVisitor(), true);
for (String namedListKey : namedList.namedListKeys()) {
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/validation/OneOfValidator.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/validation/OneOfValidator.java
new file mode 100644
index 0000000..49b3c24
--- /dev/null
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/validation/OneOfValidator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.validation;
+
+import java.util.Arrays;
+import org.apache.ignite.configuration.validation.OneOf;
+import org.apache.ignite.configuration.validation.ValidationContext;
+import org.apache.ignite.configuration.validation.ValidationIssue;
+import org.apache.ignite.configuration.validation.Validator;
+
+/**
+ * {@link Validator} implementation for the {@link OneOf} annotation.
+ */
+public class OneOfValidator implements Validator<OneOf, String> {
+ /** {@inheritDoc} */
+ @Override public void validate(OneOf annotation, ValidationContext<String> ctx) {
+ String value = ctx.getNewValue();
+
+ boolean caseSensitive = annotation.caseSensitive();
+
+ for (String exp : annotation.value()) {
+ if (caseSensitive ? exp.equals(value) : exp.equalsIgnoreCase(value))
+ return;
+ }
+
+ String message = "'" + ctx.currentKey() + "' configuration value must be one of " +
+ Arrays.toString(annotation.value()) + (caseSensitive ? " (case sensitive)" : " (case insensitive)");
+
+ ctx.addIssue(new ValidationIssue(message));
+ }
+}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
index c83bdc2..d8b17f2 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
@@ -55,6 +55,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -236,7 +237,7 @@ public class ConfigurationChangerTest {
storage.fail(true);
assertThrows(ExecutionException.class, () -> changer.change(source(KEY, (AChange parent) -> parent
- .changeChild(child -> child.changeIntCfg(1))
+ .changeChild(child -> child.changeIntCfg(1).changeStrCfg("1"))
)).get(1, SECONDS));
storage.fail(false);
@@ -247,7 +248,8 @@ public class ConfigurationChangerTest {
assertEquals(0, dataMap.size());
AView newRoot = (AView)changer.getRootNode(KEY);
- assertNull(newRoot.child());
+ assertNotNull(newRoot.child());
+ assertNull(newRoot.child().strCfg());
}
/** */
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/validation/OneOfValidatorTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/validation/OneOfValidatorTest.java
new file mode 100644
index 0000000..612842b
--- /dev/null
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/validation/OneOfValidatorTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.validation;
+
+import org.apache.ignite.configuration.validation.OneOf;
+import org.apache.ignite.configuration.validation.ValidationContext;
+import org.apache.ignite.configuration.validation.ValidationIssue;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/** */
+public class OneOfValidatorTest {
+ /** */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testValidate(boolean caseSensitive) {
+ // Prepare mocked annotation instance.
+ OneOf oneOfAnnotation = mock(OneOf.class);
+
+ when(oneOfAnnotation.value()).thenReturn(new String[] {"foo", "bar"});
+ when(oneOfAnnotation.caseSensitive()).thenReturn(caseSensitive);
+
+ // Prepare mocked validation context.
+ ValidationContext<String> ctx = mock(ValidationContext.class);
+
+ when(ctx.currentKey()).thenReturn("x");
+ when(ctx.getNewValue()).thenReturn("foo", "Bar", "no");
+
+ // Prepare issues captor.
+ ArgumentCaptor<ValidationIssue> issuesCaptor = ArgumentCaptor.forClass(ValidationIssue.class);
+ doNothing().when(ctx).addIssue(issuesCaptor.capture());
+
+ // Instantiate validator.
+ OneOfValidator oneOfValidator = new OneOfValidator();
+
+ // Assert that valid value produces no issues.
+ oneOfValidator.validate(oneOfAnnotation, ctx);
+
+ assertThat(issuesCaptor.getAllValues(), is(empty()));
+
+ // Assert that case sencitivity affects validation.
+ oneOfValidator.validate(oneOfAnnotation, ctx);
+
+ if (caseSensitive)
+ assertThat(issuesCaptor.getValue().message(), is("'x' configuration value must be one of [foo, bar] (case sensitive)"));
+ else
+ assertThat(issuesCaptor.getAllValues(), is(empty()));
+
+ // Assert that unacceptable value produces validation issue.
+ oneOfValidator.validate(oneOfAnnotation, ctx);
+
+ if (caseSensitive)
+ assertThat(issuesCaptor.getValue().message(), is("'x' configuration value must be one of [foo, bar] (case sensitive)"));
+ else
+ assertThat(issuesCaptor.getValue().message(), is("'x' configuration value must be one of [foo, bar] (case insensitive)"));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 50ca746..5b06fd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -31,11 +31,14 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.ignite.lang.IgniteLogger;
import org.jetbrains.annotations.Nullable;
@@ -467,27 +470,36 @@ public class IgniteUtils {
* thrown exception will be propagated to the caller, after all other objects are closed, similar to
* the try-with-resources block.
*
- * @param closeables Collection of objects to close.
+ * @param closeables Stream of objects to close.
* @throws Exception If failed to close.
*/
- public static void closeAll(Collection<? extends AutoCloseable> closeables) throws Exception {
- Exception ex = null;
+ public static void closeAll(Stream<? extends AutoCloseable> closeables) throws Exception {
+ AtomicReference<Exception> ex = new AtomicReference<>();
- for (AutoCloseable closeable : closeables) {
+ closeables.filter(Objects::nonNull).forEach(closeable -> {
try {
- if (closeable != null)
- closeable.close();
+ closeable.close();
}
catch (Exception e) {
- if (ex == null)
- ex = e;
- else
- ex.addSuppressed(e);
+ if (!ex.compareAndSet(null, e))
+ ex.get().addSuppressed(e);
}
- }
+ });
- if (ex != null)
- throw ex;
+ if (ex.get() != null)
+ throw ex.get();
+ }
+
+ /**
+ * Closes all provided objects. If any of the {@link AutoCloseable#close} methods throw an exception, only the first
+ * thrown exception will be propagated to the caller, after all other objects are closed, similar to
+ * the try-with-resources block.
+ *
+ * @param closeables Collection of objects to close.
+ * @throws Exception If failed to close.
+ */
+ public static void closeAll(Collection<? extends AutoCloseable> closeables) throws Exception {
+ closeAll(closeables.stream());
}
/**
@@ -499,7 +511,7 @@ public class IgniteUtils {
* @see #closeAll(Collection)
*/
public static void closeAll(AutoCloseable... closeables) throws Exception {
- closeAll(Arrays.asList(closeables));
+ closeAll(Arrays.stream(closeables));
}
/**
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
index fcdc676..5c15ba3 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
@@ -173,6 +173,13 @@ public class ColumnFamily implements AutoCloseable {
}
/**
+ * @return Column family handle.
+ */
+ public ColumnFamilyHandle handle() {
+ return cfHandle;
+ }
+
+ /**
* @return Name of the column family.
*/
public String name() {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 93b24ae..f68ab1e 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -180,7 +181,8 @@ public class IgniteImpl implements Ignite {
clusterCfgMgr = new ConfigurationManager(
Arrays.asList(
ClusterConfiguration.KEY,
- TablesConfiguration.KEY
+ TablesConfiguration.KEY,
+ DataStorageConfiguration.KEY
),
Map.of(),
new DistributedConfigurationStorage(metaStorageMgr, vaultMgr),
@@ -195,6 +197,7 @@ public class IgniteImpl implements Ignite {
distributedTblMgr = new TableManager(
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+ clusterCfgMgr.configurationRegistry().getConfiguration(DataStorageConfiguration.KEY),
raftMgr,
baselineMgr,
clusterSvc.topologyService(),
diff --git a/modules/storage-api/pom.xml b/modules/storage-api/pom.xml
index f891825..6eb9f03 100644
--- a/modules/storage-api/pom.xml
+++ b/modules/storage-api/pom.xml
@@ -35,7 +35,7 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-core</artifactId>
+ <artifactId>ignite-api</artifactId>
</dependency>
<!-- Test dependencies -->
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
similarity index 98%
rename from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
rename to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
index ee83006..9b9096d 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
@@ -31,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
* Any locking is unnecessary as this storage is used within RAFT groups where all write operations are
* serialized.
*/
-public interface Storage extends AutoCloseable {
+public interface PartitionStorage extends AutoCloseable {
/**
* Reads a DataRow for a given key.
*
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/DataRegion.java
similarity index 67%
copy from modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/DataRegion.java
index eaaed7e..7abf270 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/DataRegion.java
@@ -15,17 +15,13 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.basic;
+package org.apache.ignite.internal.storage.engine;
-import org.apache.ignite.internal.storage.AbstractStorageTest;
-import org.junit.jupiter.api.BeforeEach;
+import org.apache.ignite.internal.manager.IgniteComponent;
/**
- * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ * Interface that represents Ignite data region. Data region is a memory segment of fixed size, usually located offheap,
+ * that caches user data in memory.
*/
-public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
- @BeforeEach
- public void setUp() {
- storage = new ConcurrentHashMapStorage();
- }
+public interface DataRegion extends IgniteComponent {
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
new file mode 100644
index 0000000..4163915
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+
+/**
+ * General storageengine interface.
+ */
+public interface StorageEngine {
+ /**
+ * Creates new data resion.
+ *
+ * @param regionCfg Data region configuration.
+ * @return New data region.
+ */
+ DataRegion createDataRegion(DataRegionConfiguration regionCfg);
+
+ /**
+ * Creates new table storage.
+ *
+ * @param tablePath Path to store table data.
+ * @param tableCfg Table configuration.
+ * @param dataRegion Data region for the table.
+ * @param indexComparatorFactory Comparator factory for SQL indexes.
+ * @return New table storage.
+ */
+ TableStorage createTable(
+ Path tablePath,
+ TableConfiguration tableCfg,
+ DataRegion dataRegion,
+ BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+ );
+}
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
similarity index 52%
copy from modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
index 534d5ec..7c557a6 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
@@ -15,24 +15,34 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.notifications;
+package org.apache.ignite.internal.storage.engine;
-import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
/**
- * Configuration property change listener.
- *
- * @param <VIEW> VIEW type configuration.
+ * Table storage that contains meta, partitions and SQL indexes.
*/
-@FunctionalInterface
-public interface ConfigurationListener<VIEW> {
+public interface TableStorage {
/**
- * Called on property value update.
+ * Gets or creates a partition for current table.
*
- * @param ctx Notification context.
- * @return Future that signifies the end of the listener execution.
+ * @param partId Partition id.
+ * @return Partition storage.
*/
- @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<VIEW> ctx);
-}
+ PartitionStorage getOrCreatePartition(int partId);
+
+ /**
+ * Starts the storage.
+ *
+ * @throws StorageException If something went wrong.
+ */
+ public void start() throws StorageException;
+ /**
+ * Stops the storage.
+ *
+ * @throws StorageException If something went wrong.
+ */
+ void stop() throws StorageException;
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
similarity index 96%
rename from modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
rename to modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
index 00c75df..cbbbc21 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
@@ -61,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Abstract test that covers basic scenarios of the storage API.
*/
@ExtendWith(WorkDirectoryExtension.class)
-public abstract class AbstractStorageTest {
+public abstract class AbstractPartitionStorageTest {
/** Test key. */
private static final String KEY = "key";
@@ -69,7 +69,7 @@ public abstract class AbstractStorageTest {
private static final String VALUE = "value";
/** Storage instance. */
- protected Storage storage;
+ protected PartitionStorage storage;
/**
* Tests that read / write / remove work consistently on the same key.
@@ -465,7 +465,7 @@ public abstract class AbstractStorageTest {
}
/**
- * Tests the {@link Storage#readAll(List)} operation successfully reads data rows from the storage.
+ * Tests the {@link PartitionStorage#readAll(List)} operation successfully reads data rows from the storage.
*/
@Test
public void testReadAll() {
@@ -483,7 +483,7 @@ public abstract class AbstractStorageTest {
}
/**
- * Tests that {@link Storage#writeAll(List)} operation successfully writes a collection of data rows into the
+ * Tests that {@link PartitionStorage#writeAll(List)} operation successfully writes a collection of data rows into the
* storage.
*/
@Test
@@ -497,7 +497,7 @@ public abstract class AbstractStorageTest {
}
/**
- * Tests that {@link Storage#insertAll(List)} operation doesn't insert data rows which keys
+ * Tests that {@link PartitionStorage#insertAll(List)} operation doesn't insert data rows which keys
* are already present in the storage. This operation must also return the list of such data rows.
*/
@Test
@@ -517,7 +517,7 @@ public abstract class AbstractStorageTest {
}
/**
- * Tests that {@link Storage#removeAll(List)} operation successfully retrieves and removes a collection of
+ * Tests that {@link PartitionStorage#removeAll(List)} operation successfully retrieves and removes a collection of
* {@link SearchRow}s.
*/
@Test
@@ -547,7 +547,7 @@ public abstract class AbstractStorageTest {
}
/**
- * Tests that {@link Storage#removeAllExact(List)} operation successfully removes and retrieves a collection
+ * Tests that {@link PartitionStorage#removeAllExact(List)} operation successfully removes and retrieves a collection
* of data rows with the given exact keys and values from the storage.
*/
@Test
@@ -566,7 +566,7 @@ public abstract class AbstractStorageTest {
}
/**
- * Tests that {@link Storage#removeAllExact(List)} operation doesn't remove and retrieve a collection
+ * Tests that {@link PartitionStorage#removeAllExact(List)} operation doesn't remove and retrieve a collection
* of data rows with the given exact keys and values from the storage if the value in the storage doesn't match
* the given value.
*/
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
similarity index 98%
rename from modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
rename to modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
index f76957c..b67e132 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapPartitionStorage.java
@@ -36,8 +36,8 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.Storage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
@@ -50,7 +50,7 @@ import static java.util.stream.Collectors.toList;
/**
* Storage implementation based on {@link ConcurrentHashMap}.
*/
-public class ConcurrentHashMapStorage implements Storage {
+public class ConcurrentHashMapPartitionStorage implements PartitionStorage {
/** Name of the snapshot file. */
private static final String SNAPSHOT_FILE = "snapshot_file";
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
index eaaed7e..896d4ce 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.storage.basic;
-import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
import org.junit.jupiter.api.BeforeEach;
/**
- * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ * Storage test implementation for {@link ConcurrentHashMapPartitionStorage}.
*/
-public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
+public class ConcurrentHashMapStorageTest extends AbstractPartitionStorageTest {
@BeforeEach
public void setUp() {
- storage = new ConcurrentHashMapStorage();
+ storage = new ConcurrentHashMapPartitionStorage();
}
}
diff --git a/modules/storage-rocksdb/pom.xml b/modules/storage-rocksdb/pom.xml
index 1327d41..b2d5e98 100644
--- a/modules/storage-rocksdb/pom.xml
+++ b/modules/storage-rocksdb/pom.xml
@@ -57,6 +57,18 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<type>test-jar</type>
@@ -65,6 +77,13 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-storage-api</artifactId>
<type>test-jar</type>
<scope>test</scope>
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
new file mode 100644
index 0000000..2f1c469
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.util.Locale;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataRegionView;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.rocksdb.Cache;
+import org.rocksdb.ClockCache;
+import org.rocksdb.LRUCache;
+import org.rocksdb.WriteBufferManager;
+
+import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_CLOCK_CACHE;
+import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
+
+/**
+ * Data region implementation for {@link RocksDbStorageEngine}. Based on a {@link Cache}.
+ */
+public class RocksDbDataRegion implements DataRegion {
+ /** Region configuration. */
+ private final DataRegionConfiguration cfg;
+
+ /** RocksDB cache instance. */
+ private Cache cache;
+
+ /** Write buffer manager instance. */
+ private WriteBufferManager writeBufferManager;
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Data region configuration.
+ */
+ public RocksDbDataRegion(DataRegionConfiguration cfg) {
+ this.cfg = cfg;
+
+ assert ROCKSDB_DATA_REGION_TYPE.equalsIgnoreCase(cfg.type().value());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ DataRegionView dataRegionView = cfg.value();
+
+ long writeBufferSize = dataRegionView.writeBufferSize();
+
+ long totalCacheSize = dataRegionView.size() + writeBufferSize;
+
+ switch (dataRegionView.cache().toLowerCase(Locale.ROOT)) {
+ case ROCKSDB_CLOCK_CACHE:
+ cache = new ClockCache(totalCacheSize, dataRegionView.numShardBits(), false);
+
+ break;
+
+ case ROCKSDB_LRU_CACHE:
+ cache = new LRUCache(totalCacheSize, dataRegionView.numShardBits(), false);
+
+ break;
+
+ default:
+ assert false : dataRegionView.cache();
+ }
+
+ writeBufferManager = new WriteBufferManager(writeBufferSize, cache);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws Exception {
+ IgniteUtils.closeAll(writeBufferManager, cache);
+ }
+
+ /**
+ * @return Write buffer manager associated withthe region.
+ */
+ public WriteBufferManager writeBufferManager() {
+ return writeBufferManager;
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
similarity index 81%
rename from modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
index 24a01c3..41d47ff 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.storage.rocksdb;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -26,7 +25,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -38,8 +36,8 @@ import java.util.function.Predicate;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.Storage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.util.Cursor;
@@ -47,15 +45,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
-import org.rocksdb.AbstractComparator;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ComparatorOptions;
-import org.rocksdb.DBOptions;
import org.rocksdb.IngestExternalFileOptions;
-import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
@@ -63,94 +53,34 @@ import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
+import static java.util.Collections.nCopies;
import static org.apache.ignite.internal.rocksdb.RocksUtils.createSstFile;
/**
* Storage implementation based on a single RocksDB instance.
*/
-public class RocksDbStorage implements Storage {
+public class RocksDbPartitionStorage implements PartitionStorage {
/** Suffix for the temporary snapshot folder */
private static final String TMP_SUFFIX = ".tmp";
- /** Snapshot file name. */
- private static final String COLUMN_FAMILY_NAME = "data";
-
- static {
- RocksDB.loadLibrary();
- }
-
- /** RocksDB comparator options. */
- private final ComparatorOptions comparatorOptions;
-
- /** RocksDB comparator. */
- private final AbstractComparator comparator;
-
- /** RockDB options. */
- private final DBOptions options;
-
/** RocksDb instance. */
private final RocksDB db;
/** Data column family. */
private final ColumnFamily data;
- /** DB path. */
- private final Path dbPath;
-
/** Thread-pool for snapshot operations execution. */
private final ExecutorService snapshotExecutor = Executors.newSingleThreadExecutor();
/**
- * @param dbPath Path to the folder to store data.
- * @param comparator Keys comparator.
+ * @param db Rocks DB instance.
+ * @param columnFamily Column family to be used for all storage operations.
* @throws StorageException If failed to create RocksDB instance.
*/
- public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
- try {
- this.dbPath = dbPath;
-
- comparatorOptions = new ComparatorOptions();
-
- this.comparator = new AbstractComparator(comparatorOptions) {
- /** {@inheritDoc} */
- @Override public String name() {
- return "comparator";
- }
-
- /** {@inheritDoc} */
- @Override public int compare(ByteBuffer a, ByteBuffer b) {
- return comparator.compare(a, b);
- }
- };
-
- options = new DBOptions()
- .setCreateMissingColumnFamilies(true)
- .setCreateIfMissing(true);
-
- Options dataOptions = new Options().setCreateIfMissing(true).setComparator(this.comparator);
-
- ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
+ public RocksDbPartitionStorage(RocksDB db, ColumnFamily columnFamily) throws StorageException {
+ this.db = db;
- List<ColumnFamilyDescriptor> descriptors = Collections.singletonList(
- new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, dataFamilyOptions)
- );
-
- var handles = new ArrayList<ColumnFamilyHandle>();
-
- db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
-
- data = new ColumnFamily(db, handles.get(0), COLUMN_FAMILY_NAME, dataFamilyOptions, dataOptions);
- }
- catch (RocksDBException e) {
- try {
- close();
- }
- catch (Exception ex) {
- e.addSuppressed(ex);
- }
-
- throw new StorageException("Failed to start the storage", e);
- }
+ this.data = columnFamily;
}
/** {@inheritDoc} */
@@ -173,7 +103,7 @@ public class RocksDbStorage implements Storage {
try {
List<byte[]> keysList = getKeys(keys);
- List<byte[]> valuesList = db.multiGetAsList(keysList);
+ List<byte[]> valuesList = db.multiGetAsList(nCopies(keys.size(), data.handle()), keysList);
assert keys.size() == valuesList.size();
@@ -298,7 +228,7 @@ public class RocksDbStorage implements Storage {
WriteOptions opts = new WriteOptions()) {
List<byte[]> keys = getKeys(keyValues);
- List<byte[]> values = db.multiGetAsList(keys);
+ List<byte[]> values = db.multiGetAsList(nCopies(keys.size(), data.handle()), keys);
assert values.size() == keyValues.size();
@@ -414,7 +344,7 @@ public class RocksDbStorage implements Storage {
/** {@inheritDoc} */
@Override public void restoreSnapshot(Path path) {
try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
- Path snapshotPath = path.resolve(COLUMN_FAMILY_NAME);
+ Path snapshotPath = path.resolve(data.name());
if (!Files.exists(snapshotPath))
throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
@@ -429,8 +359,6 @@ public class RocksDbStorage implements Storage {
/** {@inheritDoc} */
@Override public void close() throws Exception {
IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS);
-
- IgniteUtils.closeAll(data, db, options, comparator, comparatorOptions);
}
/** Cursor wrapper over the RocksIterator object with custom filter. */
@@ -504,14 +432,6 @@ public class RocksDbStorage implements Storage {
}
/**
- * @return Path to the database.
- */
- @TestOnly
- public Path getDbPath() {
- return dbPath;
- }
-
- /**
* Gets a list of key byte arrays.
* @param keyValues Key rows.
* @return List of keys as byte arrays.
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
new file mode 100644
index 0000000..318026f
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.rocksdb.RocksDB;
+
+/**
+ * Storage engine implementation based on RocksDB.
+ */
+public class RocksDbStorageEngine implements StorageEngine {
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /** {@inheritDoc} */
+ @Override public DataRegion createDataRegion(DataRegionConfiguration regionCfg) {
+ return new RocksDbDataRegion(regionCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public TableStorage createTable(
+ Path tablePath,
+ TableConfiguration tableCfg,
+ DataRegion dataRegion,
+ BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+ ) {
+ assert dataRegion instanceof RocksDbDataRegion : dataRegion;
+
+ return new RocksDbTableStorage(
+ tablePath,
+ tableCfg,
+ (RocksDbDataRegion)dataRegion,
+ indexComparatorFactory
+ );
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
new file mode 100644
index 0000000..5f4fe6f
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.BiFunction;
+import java.util.stream.IntStream;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static java.lang.Integer.parseInt;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Stream.concat;
+
+/**
+ * Table storage implementation based on {@link RocksDB} instance.
+ */
+public class RocksDbTableStorage implements TableStorage {
+ /**
+ * Name of the meta column family matches default columns family, meaning that it always exist when new table is
+ * created.
+ */
+ private static final String CF_META = "default";
+
+ /** Prefix for partitions column families names. */
+ private static final String CF_PARTITION_PREFIX = "cf-part:";
+
+ /** Prefix for SQL indexes column family names. */
+ private static final String CF_INDEX_PREFIX = "cf-idx:";
+
+ /** Name of comparator used in indexes column family. */
+ private static final String INDEX_COMPARATOR_NAME = "index-comparator";
+
+ /** Path for the directory that stores table data. */
+ private final Path tablePath;
+
+ /** Table configuration. */
+ private final TableConfiguration tableCfg;
+
+ /** Data region for the table. */
+ private final RocksDbDataRegion dataRegion;
+
+ /** Comparators factory for indexes. */
+ private final BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory;
+
+ /** List of closeable resources to close on {@link #stop()}. Better than having a field for each one of them. */
+ private final List<AutoCloseable> autoCloseables = new ArrayList<>();
+
+ /** Rocks DB instance itself. */
+ private RocksDB db;
+
+ /** CF handle for meta information. */
+ @SuppressWarnings("unused")
+ private ColumnFamilyHandle metaCfHandle;
+
+ /** Column families for partitions. Stored as an array for the quick access by an index. */
+ private AtomicReferenceArray<ColumnFamily> partitionCfs;
+
+ /** Max number of partitions in the table. */
+ private int partitions;
+
+ /** Column families for indexes by their names. */
+ private Map<String, ColumnFamilyHandle> indicesCfHandles = new ConcurrentHashMap<>();
+
+ /** Utility enum to describe a type of the column family - meta, partition or index. */
+ private enum ColumnFamilyType {
+ META, PARTITION, INDEX
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param tablePath Path for the directory that stores table data.
+ * @param tableCfg Table configuration.
+ * @param dataRegion Data region for the table.
+ * @param indexComparatorFactory Comparators factory for indexes.
+ */
+ public RocksDbTableStorage(
+ Path tablePath,
+ TableConfiguration tableCfg,
+ RocksDbDataRegion dataRegion,
+ BiFunction<TableView, String, Comparator<ByteBuffer>> indexComparatorFactory
+ ) {
+ this.tablePath = tablePath;
+ this.tableCfg = tableCfg;
+ this.dataRegion = dataRegion;
+ this.indexComparatorFactory = indexComparatorFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws StorageException {
+ Map<ColumnFamilyType, List<String>> cfNamesGrouped = getColumnFamiliesNames();
+
+ List<ColumnFamilyDescriptor> cfDescriptors = convertToColumnFamiliesDescriptors(cfNamesGrouped);
+
+ List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+ DBOptions dbOptions = addToCloseableResources(new DBOptions()
+ .setCreateIfMissing(true)
+ .setWriteBufferManager(dataRegion.writeBufferManager())
+ );
+
+ try {
+ db = addToCloseableResources(RocksDB.open(dbOptions, tablePath.toAbsolutePath().toString(), cfDescriptors, cfHandles));
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to initialize RocksDB instance.", e);
+ }
+
+ partitions = tableCfg.value().partitions();
+
+ partitionCfs = new AtomicReferenceArray<>(partitions);
+
+ for (int cfListIndex = 0; cfListIndex < cfHandles.size(); cfListIndex++) {
+ ColumnFamilyHandle cfHandle = cfHandles.get(cfListIndex);
+
+ String handleName;
+ try {
+ handleName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to read RocksDB column family name.", e);
+ }
+
+ if (handleName.equals(CF_META))
+ this.metaCfHandle = addToCloseableResources(cfHandle);
+ else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
+ int partId = partitionId(handleName);
+
+ ColumnFamilyDescriptor cfDescriptor = cfDescriptors.get(cfListIndex);
+
+ partitionCfs.set(partId, new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null));
+ }
+ else {
+ String indexName = handleName.substring(CF_INDEX_PREFIX.length());
+
+ indicesCfHandles.put(indexName, cfHandle);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws StorageException {
+ try {
+ List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
+
+ Collections.reverse(copy);
+
+ IgniteUtils.closeAll(concat(
+ concat(IntStream.range(0, partitions).mapToObj(partitionCfs::get), indicesCfHandles.values().stream()),
+ copy.stream()
+ ));
+ }
+ catch (Exception e) {
+ throw new StorageException("Failed to stop RocksDB table storage.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionStorage getOrCreatePartition(int partId) {
+ assert partId < partitions : S.toString(
+ "Attempt to create partition with id outside of configured range",
+ "partitionId", partId, false,
+ "partitions", partitions, false
+ );
+
+ ColumnFamily partitionCf = partitionCfs.get(partId);
+
+ if (partitionCf == null) {
+ String handleName = partitionColumnFamilyName(partId);
+
+ ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
+ handleName.getBytes(StandardCharsets.UTF_8),
+ new ColumnFamilyOptions()
+ );
+
+ try {
+ ColumnFamilyHandle cfHandle = db.createColumnFamily(cfDescriptor);
+
+ partitionCf = new ColumnFamily(db, cfHandle, handleName, cfDescriptor.getOptions(), null);
+ }
+ catch (RocksDBException e) {
+ cfDescriptor.getOptions().close();
+
+ throw new StorageException("Failed to create new RocksDB column family " + handleName, e);
+ }
+
+ partitionCfs.set(partId, partitionCf);
+ }
+
+ return new RocksDbPartitionStorage(db, partitionCf);
+ }
+
+ /**
+ * Returns list of column families names that belong to RocksDB instance in the given path, grouped by thier
+ * {@link ColumnFamilyType}.
+ *
+ * @return Map with column families names.
+ * @throws StorageException If something went wrong.
+ */
+ private Map<ColumnFamilyType, List<String>> getColumnFamiliesNames() {
+ String absolutePathStr = tablePath.toAbsolutePath().toString();
+
+ List<String> cfNames = new ArrayList<>();
+
+ try (Options opts = new Options()) {
+ List<byte[]> cfNamesBytes = RocksDB.listColumnFamilies(opts, absolutePathStr);
+
+ for (byte[] cfNameBytes : cfNamesBytes)
+ cfNames.add(new String(cfNameBytes, StandardCharsets.UTF_8));
+ }
+ catch (RocksDBException e) {
+ throw new StorageException(
+ "Failed to read list of column families names for the RocksDB instance located at path " + absolutePathStr,
+ e
+ );
+ }
+
+ return cfNames.stream().collect(groupingBy(this::columnFamilyType));
+ }
+
+ /**
+ * Returns list of CF descriptors by their names.
+ *
+ * @param cfGrouped Map from CF type to lists of names.
+ * @return List of CF descriptors.
+ */
+ @NotNull private List<ColumnFamilyDescriptor> convertToColumnFamiliesDescriptors(
+ Map<ColumnFamilyType,
+ List<String>> cfGrouped
+ ) {
+ List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+ Options cfOptions = addToCloseableResources(new Options().setCreateIfMissing(true));
+
+ cfDescriptors.add(new ColumnFamilyDescriptor(
+ CF_META.getBytes(StandardCharsets.UTF_8),
+ addToCloseableResources(new ColumnFamilyOptions(cfOptions))
+ ));
+
+ for (String partitionCfName : cfGrouped.getOrDefault(ColumnFamilyType.PARTITION, List.of())) {
+ cfDescriptors.add(new ColumnFamilyDescriptor(
+ partitionCfName.getBytes(StandardCharsets.UTF_8),
+ new ColumnFamilyOptions()
+ ));
+ }
+
+ NamedListView<? extends TableIndexView> indicesCfgView = tableCfg.value().indices();
+
+ for (String indexCfName : cfGrouped.getOrDefault(ColumnFamilyType.INDEX, List.of())) {
+ String indexName = indexCfName.substring(CF_INDEX_PREFIX.length());
+
+ TableIndexView indexCfgView = indicesCfgView.get(indexName);
+
+ assert indexCfgView != null : "Found index that is absent in configuration: " + indexCfName;
+
+ Comparator<ByteBuffer> indexComparator = indexComparatorFactory.apply(tableCfg.value(), indexName);
+
+ cfDescriptors.add(new ColumnFamilyDescriptor(
+ indexCfName.getBytes(StandardCharsets.UTF_8),
+ new ColumnFamilyOptions()
+ .setComparator(addToCloseableResources(
+ new AbstractComparator(addToCloseableResources(new ComparatorOptions())) {
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return INDEX_COMPARATOR_NAME;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(ByteBuffer a, ByteBuffer b) {
+ return indexComparator.compare(a, b);
+ }
+ }))
+ ));
+ }
+
+ return cfDescriptors;
+ }
+
+ /**
+ * Creates column family name by partition id.
+ *
+ * @param partId Partition id.
+ * @return Column family name.
+ */
+ private static String partitionColumnFamilyName(int partId) {
+ return CF_PARTITION_PREFIX + partId;
+ }
+
+ /**
+ * Gets partition id from column family name.
+ *
+ * @param cfName Column family name.
+ * @return Partition id.
+ */
+ private static int partitionId(String cfName) {
+ return parseInt(cfName.substring(CF_PARTITION_PREFIX.length()));
+ }
+
+ /**
+ * Creates column family name by index name.
+ *
+ * @param idxName Index name.
+ * @return Column family name.
+ */
+ private static String indexColumnFamilyName(String idxName) {
+ return CF_INDEX_PREFIX + idxName;
+ }
+
+ /**
+ * Determines column family type by its name.
+ *
+ * @param cfName Column family name.
+ * @return Column family type.
+ * @throws StorageException If column family name doesn't match any known pattern.
+ */
+ private ColumnFamilyType columnFamilyType(String cfName) throws StorageException {
+ if (CF_META.equals(cfName))
+ return ColumnFamilyType.META;
+
+ if (cfName.startsWith(CF_PARTITION_PREFIX))
+ return ColumnFamilyType.PARTITION;
+
+ if (cfName.startsWith(CF_INDEX_PREFIX))
+ return ColumnFamilyType.INDEX;
+
+ throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.name() + ']');
+ }
+
+ /**
+ * Adds resource to the {@link #autoCloseables} list.
+ *
+ * @param autoCloseable Closeable resource.
+ * @param <R> Type of the resource.
+ * @return Passed resource with the same type.
+ */
+ private <R extends AutoCloseable> R addToCloseableResources(R autoCloseable) {
+ autoCloseables.add(autoCloseable);
+
+ return autoCloseable;
+ }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
index 2a1e7cb..1559bba 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
@@ -17,9 +17,14 @@
package org.apache.ignite.internal.storage.rocksdb;
-import java.nio.ByteBuffer;
import java.nio.file.Path;
-import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -27,20 +32,56 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
/**
- * Storage test implementation for {@link RocksDbStorage}.
+ * Storage test implementation for {@link RocksDbPartitionStorage}.
*/
@ExtendWith(WorkDirectoryExtension.class)
-public class RocksDbStorageTest extends AbstractStorageTest {
+@ExtendWith(ConfigurationExtension.class)
+public class RocksDbStorageTest extends AbstractPartitionStorageTest {
+ /** */
+ private TableStorage table;
+
+ /** */
+ private DataRegion dataRegion;
+
/** */
@BeforeEach
- public void setUp(@WorkDirectory Path workDir) {
- storage = new RocksDbStorage(workDir, ByteBuffer::compareTo);
+ public void setUp(
+ @WorkDirectory Path workDir,
+ @InjectConfiguration DataRegionConfiguration dataRegionCfg,
+ @InjectConfiguration TableConfiguration tableCfg
+ ) throws Exception {
+ dataRegionCfg.change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)).get();
+
+ RocksDbStorageEngine engine = new RocksDbStorageEngine();
+
+ dataRegion = engine.createDataRegion(dataRegionCfg);
+
+ assertThat(dataRegion, is(instanceOf(RocksDbDataRegion.class)));
+
+ dataRegion.start();
+
+ table = engine.createTable(workDir, tableCfg, dataRegion, (tableView, indexName) -> null);
+
+ assertThat(table, is(instanceOf(RocksDbTableStorage.class)));
+
+ table.start();
+
+ storage = table.getOrCreatePartition(0);
+
+ assertThat(storage, is(instanceOf(RocksDbPartitionStorage.class)));
}
/** */
@AfterEach
public void tearDown() throws Exception {
- IgniteUtils.closeAll(storage);
+ IgniteUtils.closeAll(
+ table == null ? null : table::stop,
+ dataRegion == null ? null : dataRegion::stop
+ );
}
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index ddd7e8c..56cf6ae 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.storage.basic.ConcurrentHashMapStorage;
+import org.apache.ignite.internal.storage.basic.ConcurrentHashMapPartitionStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
@@ -189,7 +189,7 @@ public class ITDistributedTableTest {
partSrv.startRaftGroup(
grpId,
- new PartitionListener(new ConcurrentHashMapStorage()),
+ new PartitionListener(new ConcurrentHashMapPartitionStorage()),
conf
);
@@ -280,7 +280,7 @@ public class ITDistributedTableTest {
rs.startRaftGroup(
grpId,
- new PartitionListener(new ConcurrentHashMapStorage()),
+ new PartitionListener(new ConcurrentHashMapPartitionStorage()),
conf
);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
index 17d71c2..384ac2c 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITInternalTableScanTest.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.table.InternalTable;
@@ -102,7 +102,7 @@ public class ITInternalTableScanTest {
/** Mock partition storage. */
@Mock
- private Storage mockStorage;
+ private PartitionStorage mockStorage;
/** */
private ClusterService network;
@@ -150,7 +150,7 @@ public class ITInternalTableScanTest {
List<Peer> conf = List.of(new Peer(nodeNetworkAddress));
- mockStorage = mock(Storage.class);
+ mockStorage = mock(PartitionStorage.class);
raftSrv.startRaftGroup(
grpName,
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
index 28b3a4d..4090f70 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
@@ -32,8 +32,8 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.Storage;
-import org.apache.ignite.internal.storage.basic.ConcurrentHashMapStorage;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.basic.ConcurrentHashMapPartitionStorage;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -114,7 +114,7 @@ public class ITTablePersistenceTest extends ITAbstractListenerSnapshotTest<Parti
/** {@inheritDoc} */
@Override public BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot) {
- Storage storage = getListener(restarted, raftGroupId()).getStorage();
+ PartitionStorage storage = getListener(restarted, raftGroupId()).getStorage();
Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
@@ -144,7 +144,7 @@ public class ITTablePersistenceTest extends ITAbstractListenerSnapshotTest<Parti
.map(Map.Entry::getKey)
.findAny()
.orElseGet(() -> {
- PartitionListener listener = new PartitionListener(new ConcurrentHashMapStorage());
+ PartitionListener listener = new PartitionListener(new ConcurrentHashMapPartitionStorage());
paths.put(listener, workDir);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 26ebde4..73d4bdc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -34,10 +33,9 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.stream.IntStream;
-
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
@@ -59,7 +57,10 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
-import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -107,6 +108,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Tables configuration. */
private final TablesConfiguration tablesCfg;
+ /** Data storage configuration. */
+ private final DataStorageConfiguration dataStorageCfg;
+
/** Raft manager. */
private final Loza raftMgr;
@@ -118,6 +122,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Meta storage service. */
private final MetaStorageManager metaStorageMgr;
+ /** Storage engine instance. Only one type is available right now, which is the {@link RocksDbStorageEngine}. */
+ private final StorageEngine engine;
+
/** Partitions store directory. */
private final Path partitionsStoreDir;
@@ -130,10 +137,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Resolver that resolves a network address to node id. */
private final Function<NetworkAddress, String> netAddrResolver;
+ /** Default data region instance. */
+ private DataRegion defaultDataRegion;
+
+ //TODO: IGNITE-15161 These should go into TableImpl instances.
+ /** Instances of table storages that need to be stopped on component stop. */
+ private final Set<TableStorage> tableStorages = ConcurrentHashMap.newKeySet();
+
/**
* Creates a new table manager.
*
* @param tablesCfg Tables configuration.
+ * @param dataStorageCfg Data storage configuration.
* @param raftMgr Raft manager.
* @param baselineMgr Baseline manager.
* @param metaStorageMgr Meta storage manager.
@@ -141,6 +156,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
*/
public TableManager(
TablesConfiguration tablesCfg,
+ DataStorageConfiguration dataStorageCfg,
Loza raftMgr,
BaselineManager baselineMgr,
TopologyService topologyService,
@@ -148,6 +164,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
Path partitionsStoreDir
) {
this.tablesCfg = tablesCfg;
+ this.dataStorageCfg = dataStorageCfg;
this.raftMgr = raftMgr;
this.baselineMgr = baselineMgr;
this.metaStorageMgr = metaStorageMgr;
@@ -161,6 +178,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return node.id();
};
+
+ engine = new RocksDbStorageEngine();
}
/** {@inheritDoc} */
@@ -248,10 +267,30 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return CompletableFuture.completedFuture(null);
}
});
+
+ this.defaultDataRegion = engine.createDataRegion(dataStorageCfg.defaultRegion());
+
+ defaultDataRegion.start();
}
/** {@inheritDoc} */
@Override public void stop() {
+ for (TableStorage tableStorage : tableStorages) {
+ try {
+ tableStorage.stop();
+ }
+ catch (Exception e) {
+ LOG.error("Failed to stop table storage " + tableStorage, e);
+ }
+ }
+
+ try {
+ if (defaultDataRegion != null)
+ defaultDataRegion.stop();
+ }
+ catch (Exception e) {
+ LOG.error("Failed to stop data region " + defaultDataRegion, e);
+ }
// TODO: IGNITE-15161 Implement component's stop.
}
@@ -272,34 +311,42 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
var partitionsGroupsFutures = new ArrayList<CompletableFuture<RaftGroupService>>();
- IntStream.range(0, partitions).forEach(p ->
+ Path storageDir = partitionsStoreDir.resolve(name);
+
+ try {
+ Files.createDirectories(storageDir);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException(
+ "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+ e
+ );
+ }
+
+ TableStorage tableStorage = engine.createTable(
+ storageDir,
+ tablesCfg.tables().get(name),
+ defaultDataRegion,
+ (tableCfgView, indexName) -> {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+ );
+
+ tableStorage.start();
+
+ tableStorages.add(tableStorage);
+
+ for (int p = 0; p < partitions; p++) {
+ int partId = p;
+
partitionsGroupsFutures.add(
raftMgr.prepareRaftGroup(
raftGroupName(tblId, p),
assignment.get(p),
- () -> {
- Path storageDir = partitionsStoreDir.resolve(name);
-
- try {
- Files.createDirectories(storageDir);
- }
- catch (IOException e) {
- throw new IgniteInternalException(
- "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
- e
- );
- }
-
- return new PartitionListener(
- new RocksDbStorage(
- storageDir.resolve(String.valueOf(p)),
- ByteBuffer::compareTo
- )
- );
- }
+ () -> new PartitionListener(tableStorage.getOrCreatePartition(partId))
)
- )
- );
+ );
+ }
CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
try {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 077183d..04214b4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -28,12 +28,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.Storage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
@@ -78,7 +77,7 @@ import org.jetbrains.annotations.TestOnly;
*/
public class PartitionListener implements RaftGroupListener {
/** Partition storage. */
- private final Storage storage;
+ private final PartitionStorage storage;
/** Cursors map. */
private final Map<IgniteUuid, CursorMeta> cursors;
@@ -86,10 +85,10 @@ public class PartitionListener implements RaftGroupListener {
/**
* Constructor.
*
- * @param storage Storage.
+ * @param partitionStorage Storage.
*/
- public PartitionListener(Storage storage) {
- this.storage = storage;
+ public PartitionListener(PartitionStorage partitionStorage) {
+ this.storage = partitionStorage;
this.cursors = new ConcurrentHashMap<>();
}
@@ -591,7 +590,7 @@ public class PartitionListener implements RaftGroupListener {
* @return Underlying storage.
*/
@TestOnly
- public Storage getStorage() {
+ public PartitionStorage getStorage() {
return storage;
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index cf13516..3e36f64 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.baseline.BaselineManager;
@@ -156,22 +157,19 @@ public class TableManagerTest {
);
clusterCfgMgr = new ConfigurationManager(
- List.of(ClusterConfiguration.KEY, TablesConfiguration.KEY),
+ List.of(ClusterConfiguration.KEY, TablesConfiguration.KEY, DataStorageConfiguration.KEY),
Map.of(),
new TestConfigurationStorage(DISTRIBUTED),
Collections.singletonList(ExtendedTableConfigurationSchema.class)
);
nodeCfgMgr.start();
+
+ nodeCfgMgr.bootstrap("node.metastorageNodes = [" + NODE_NAME + "]");
+
clusterCfgMgr.start();
- nodeCfgMgr.bootstrap("{\n" +
- " \"node\":{\n" +
- " \"metastorageNodes\":[\n" +
- " \"" + NODE_NAME + "\"\n" +
- " ]\n" +
- " }\n" +
- "}");
+ clusterCfgMgr.configurationRegistry().initializeDefaults();
}
catch (Exception e) {
LOG.error("Failed to bootstrap the test configuration manager.", e);
@@ -195,6 +193,7 @@ public class TableManagerTest {
public void testStaticTableConfigured() {
TableManager tableManager = new TableManager(
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+ clusterCfgMgr.configurationRegistry().getConfiguration(DataStorageConfiguration.KEY),
rm,
bm,
ts,
@@ -365,6 +364,7 @@ public class TableManagerTest {
TableManager tableManager = new TableManager(
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+ clusterCfgMgr.configurationRegistry().getConfiguration(DataStorageConfiguration.KEY),
rm,
bm,
ts,
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 76d4945..396c298 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.storage.basic.ConcurrentHashMapStorage;
+import org.apache.ignite.internal.storage.basic.ConcurrentHashMapPartitionStorage;
import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
@@ -84,7 +84,7 @@ public class PartitionCommandListenerTest {
*/
@BeforeEach
public void before() {
- commandListener = new PartitionListener(new ConcurrentHashMapStorage());
+ commandListener = new PartitionListener(new ConcurrentHashMapPartitionStorage());
}
/**
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
index 0de6d2b..df2d958 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
@@ -84,7 +84,7 @@ public class PersistentVaultService implements VaultService {
.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
.setLevelCompactionDynamicLevelBytes(true)
- .setBytesPerSync(1048576)
+ .setBytesPerSync(1024 * 1024)
.setCompactionPriority(CompactionPriority.MinOverlappingRatio)
.setTableFormatConfig(
new BlockBasedTableConfig()