You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/05/21 08:35:00 UTC
[ignite-3] branch main updated: IGNITE-14741 Extend test coverage
for Affinity and Table managers - Fixes #134.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f8eaa36 IGNITE-14741 Extend test coverage for Affinity and Table managers - Fixes #134.
f8eaa36 is described below
commit f8eaa3636e1249c89fa3a5feb439c3f6219a84b4
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri May 21 11:33:00 2021 +0300
IGNITE-14741 Extend test coverage for Affinity and Table managers - Fixes #134.
Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
modules/affinity/pom.xml | 6 +
.../ignite/internal/affinity/AffinityManager.java | 9 +-
.../internal/affinity/AffinityManagerTest.java | 251 ++++++++++++++
.../affinity/TestConfigurationStorage.java | 79 +++++
.../internal/ConfigurationManager.java | 21 +-
.../apache/ignite/internal/app/IgnitionImpl.java | 2 +-
modules/table/pom.xml | 6 +
.../internal/table/distributed/TableManager.java | 31 +-
.../ignite/internal/table/TableManagerTest.java | 374 +++++++++++++++++++++
.../internal/table/TestConfigurationStorage.java | 79 +++++
10 files changed, 839 insertions(+), 19 deletions(-)
diff --git a/modules/affinity/pom.xml b/modules/affinity/pom.xml
index 981b8e7..6549e70 100644
--- a/modules/affinity/pom.xml
+++ b/modules/affinity/pom.xml
@@ -70,5 +70,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 57f795a..73a80d1 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -88,7 +88,9 @@ public class AffinityManager extends Producer<AffinityEvent, AffinityEventParame
metaStorageMgr.registerWatchByPrefix(new ByteArray(INTERNAL_PREFIX), new WatchListener() {
@Override public boolean onUpdate(@NotNull WatchEvent watchEvt) {
for (EntryEvent evt : watchEvt.entryEvents()) {
- String tabIdVal = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+ String keyAsString = new ByteArray(evt.newEntry().key().bytes()).toString();
+
+ String tabIdVal = keyAsString.substring(INTERNAL_PREFIX.length());
UUID tblId = UUID.fromString(tabIdVal);
@@ -127,9 +129,8 @@ public class AffinityManager extends Producer<AffinityEvent, AffinityEventParame
return vaultMgr
.get(ByteArray.fromString(INTERNAL_PREFIX + tblId))
.thenCompose(entry -> {
-
- TableConfiguration tblConfig = configurationMgr.configurationRegistry()
- .getConfiguration(TablesConfiguration.KEY).tables().get(new String(entry.value(), StandardCharsets.UTF_8));
+ TableConfiguration tblConfig = configurationMgr.configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY).tables().get(new String(entry.value(), StandardCharsets.UTF_8));
var key = new ByteArray(INTERNAL_PREFIX + tblId);
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
new file mode 100644
index 0000000..08d9778
--- /dev/null
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.storage.ConfigurationType;
+import org.apache.ignite.internal.affinity.event.AffinityEvent;
+import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.client.Condition;
+import org.apache.ignite.metastorage.client.Entry;
+import org.apache.ignite.metastorage.client.EntryEvent;
+import org.apache.ignite.metastorage.client.Operation;
+import org.apache.ignite.metastorage.client.WatchEvent;
+import org.apache.ignite.metastorage.client.WatchListener;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests scenarios for affinity manager.
+ */
+public class AffinityManagerTest {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(AffinityManagerTest.class);
+
+ /** Internal prefix for the metasorage. */
+ private static final String INTERNAL_PREFIX = "internal.tables.assignment.";
+
+ /** Node name. */
+ public static final String NODE_NAME = "node1";
+
+ /** The name of the table which is statically configured. */
+ private static final String STATIC_TABLE_NAME = "t1";
+
+ /** Configuration manager. */
+ private ConfigurationManager cfrMgr;
+
+ /** Before all test scenarios. */
+ @BeforeEach
+ private void before() {
+ try {
+ cfrMgr = new ConfigurationManager(rootConfigurationKeys(), Arrays.asList(
+ new TestConfigurationStorage(ConfigurationType.DISTRIBUTED)));
+
+ cfrMgr.bootstrap("{\n" +
+ " \"table\":{\n" +
+ " \"tables\":{\n" +
+ " \"" + STATIC_TABLE_NAME + "\":{\n" +
+ " \"name\":\"TestTable\",\n" +
+ " \"partitions\":16,\n" +
+ " \"replicas\":1,\n" +
+ " \"columns\":{\n" +
+ " \"id\":{\n" +
+ " \"name\":\"id\",\n" +
+ " \"type\":{\n" +
+ " \"type\":\"Int64\"\n" +
+ " },\n" +
+ " \"nullable\":false\n" +
+ " }\n" +
+ " },\n" +
+ " \"indices\":{\n" +
+ " \"pk\":{\n" +
+ " \"name\":\"pk\",\n" +
+ " \"type\":\"primary\",\n" +
+ " \"uniq\":true,\n" +
+ " \"columns\":{\n" +
+ " \"id\":{\n" +
+ " \"name\":\"id\",\n" +
+ " \"asc\":true\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}", ConfigurationType.DISTRIBUTED);
+ }
+ catch (Exception e) {
+ LOG.error("Failed to bootstrap the test configuration manager.", e);
+
+ fail("Failed to configure manager [err=" + e.getMessage() + ']');
+ }
+
+ }
+
+ /**
+ * Gets a list of configuration keys to use in the test scenario.
+ *
+ * @return List of root configuration keys.
+ */
+ private static List<RootKey<?, ?>> rootConfigurationKeys() {
+ return Arrays.asList(
+ TablesConfiguration.KEY
+ );
+ }
+
+ /**
+ * The test calculates assignment by predefined table configuration and checks assignment calculated event.
+ */
+ @Test
+ public void testCalculatedAssignment() {
+ MetaStorageManager mm = mock(MetaStorageManager.class);
+ BaselineManager bm = mock(BaselineManager.class);
+ VaultManager vm = mock(VaultManager.class);
+
+ UUID tblId = UUID.randomUUID();
+
+ when(vm.get(any())).thenAnswer(invocation -> {
+ ByteArray key = invocation.getArgument(0);
+
+ assertEquals(INTERNAL_PREFIX + tblId, new String(key.bytes(), StandardCharsets.UTF_8));
+
+ return CompletableFuture.completedFuture(new org.apache.ignite.internal.vault.common.Entry(key, STATIC_TABLE_NAME.getBytes(StandardCharsets.UTF_8)));
+ });
+
+ CompletableFuture<WatchListener> watchFut = new CompletableFuture<>();
+
+ when(mm.registerWatchByPrefix(any(), any())).thenAnswer(invocation -> {
+ ByteArray metastoreKeyPrefix = invocation.getArgument(0);
+
+ assertEquals(INTERNAL_PREFIX, new String(metastoreKeyPrefix.bytes(), StandardCharsets.UTF_8));
+
+ watchFut.complete(invocation.getArgument(1));
+
+ return CompletableFuture.completedFuture(42L);
+ });
+
+ when(mm.invoke((Condition)any(), (Operation)any(), (Operation)any())).thenAnswer(invocation -> {
+ assertTrue(watchFut.isDone());
+
+ ByteArray key = new ByteArray(INTERNAL_PREFIX + tblId);
+
+ Entry oldEntry = mock(Entry.class);
+
+ when(oldEntry.key()).thenReturn(key);
+
+ Entry newEntry = mock(Entry.class);
+
+ when(newEntry.key()).thenReturn(key);
+ when(newEntry.value()).thenReturn(ByteUtils.toBytes(Collections.EMPTY_LIST));
+
+ WatchListener lsnr = watchFut.join();
+
+ CompletableFuture.supplyAsync(() ->
+ lsnr.onUpdate(new WatchEvent(new EntryEvent(oldEntry, newEntry))));
+
+ return CompletableFuture.completedFuture(true);
+ });
+
+ AffinityManager affinityManager = new AffinityManager(cfrMgr, mm, bm, vm);
+
+ CompletableFuture<Boolean> assignmentCalculated = new CompletableFuture<>();
+
+ affinityManager.listen(AffinityEvent.CALCULATED, (parameters, e) -> assignmentCalculated.complete(e == null));
+
+ affinityManager.calculateAssignments(tblId);
+
+ assertTrue(assignmentCalculated.join());
+ }
+
+ /**
+ * The test removes an assignment and checks assignment removed event.
+ */
+ @Test
+ public void testRemovedAssignment() {
+ MetaStorageManager mm = mock(MetaStorageManager.class);
+ BaselineManager bm = mock(BaselineManager.class);
+ VaultManager vm = mock(VaultManager.class);
+
+ UUID tblId = UUID.randomUUID();
+
+ CompletableFuture<WatchListener> watchFut = new CompletableFuture<>();
+
+ when(mm.registerWatchByPrefix(any(), any())).thenAnswer(invocation -> {
+ ByteArray metastoreKeyPrefix = invocation.getArgument(0);
+
+ assertEquals(INTERNAL_PREFIX, new String(metastoreKeyPrefix.bytes(), StandardCharsets.UTF_8));
+
+ watchFut.complete(invocation.getArgument(1));
+
+ return CompletableFuture.completedFuture(42L);
+ });
+
+ when(mm.invoke((Condition)any(), (Operation)any(), (Operation)any())).thenAnswer(invocation -> {
+ assertTrue(watchFut.isDone());
+
+ ByteArray key = new ByteArray(INTERNAL_PREFIX + tblId);
+
+ Entry oldEntry = mock(Entry.class);
+
+ when(oldEntry.key()).thenReturn(key);
+ when(oldEntry.value()).thenReturn(ByteUtils.toBytes(Collections.EMPTY_LIST));
+
+ Entry newEntry = mock(Entry.class);
+
+ when(newEntry.key()).thenReturn(key);
+
+ WatchListener lsnr = watchFut.join();
+
+ CompletableFuture.supplyAsync(() ->
+ lsnr.onUpdate(new WatchEvent(new EntryEvent(oldEntry, newEntry))));
+
+ return CompletableFuture.completedFuture(true);
+ });
+
+ AffinityManager affinityManager = new AffinityManager(cfrMgr, mm, bm, vm);
+
+ CompletableFuture<Boolean> assignmentRemoved = new CompletableFuture<>();
+
+ affinityManager.listen(AffinityEvent.REMOVED, (parameters, e) -> assignmentRemoved.complete(e == null));
+
+ affinityManager.removeAssignment(tblId);
+
+ assertTrue(assignmentRemoved.join());
+ }
+}
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/TestConfigurationStorage.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/TestConfigurationStorage.java
new file mode 100644
index 0000000..df8c551
--- /dev/null
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/TestConfigurationStorage.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.storage.ConfigurationStorage;
+import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
+import org.apache.ignite.configuration.storage.ConfigurationType;
+import org.apache.ignite.configuration.storage.Data;
+import org.apache.ignite.configuration.storage.StorageException;
+
+/**
+ * Test configurationstorage.
+ */
+public class TestConfigurationStorage implements ConfigurationStorage {
+ /** Listeners. */
+ private final Set<ConfigurationStorageListener> listeners = new HashSet<>();
+
+ /** Configuration type. */
+ private final ConfigurationType type;
+
+ /**
+ * @param type Configuration type.
+ */
+ public TestConfigurationStorage(ConfigurationType type) {
+ this.type = type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Data readAll() throws StorageException {
+ return new Data(Collections.emptyMap(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
+ for (ConfigurationStorageListener listener : listeners)
+ listener.onEntriesChanged(new Data(newValues, version + 1));
+
+ return CompletableFuture.completedFuture(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addListener(ConfigurationStorageListener listener) {
+ listeners.add(listener);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeListener(ConfigurationStorageListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override public void notifyApplied(long storageRevision) {
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConfigurationType type() {
+ return type;
+ }
+}
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/ConfigurationManager.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/ConfigurationManager.java
index 669eeb1..ba97824 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/ConfigurationManager.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/ConfigurationManager.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonParser;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -29,6 +30,7 @@ import org.apache.ignite.configuration.ConfigurationRegistry;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.internal.rest.JsonConverter;
import org.apache.ignite.configuration.storage.ConfigurationStorage;
+import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.configuration.validation.Validator;
/**
@@ -39,8 +41,8 @@ import org.apache.ignite.configuration.validation.Validator;
/** Configuration registry. */
private final ConfigurationRegistry confRegistry;
- /** Set of configuration storages. */
- private final Set<ConfigurationStorage> configurationStorages;
+ /** Type mapped to configuration storage. */
+ private final Map<ConfigurationType, ConfigurationStorage> configurationStorages;
/**
* The constructor.
@@ -54,7 +56,15 @@ import org.apache.ignite.configuration.validation.Validator;
Map<Class<? extends Annotation>, Set<Validator<? extends Annotation, ?>>> validators,
Collection<ConfigurationStorage> configurationStorages
) {
- this.configurationStorages = Set.copyOf(configurationStorages);
+ HashMap<ConfigurationType, ConfigurationStorage> storageByType = new HashMap<>();
+
+ for (ConfigurationStorage storage : configurationStorages) {
+ assert !storageByType.containsKey(storage.type()) : "Two or more storage have the same configuration type [type=" + storage.type() + ']';
+
+ storageByType.put(storage.type(), storage);
+ }
+
+ this.configurationStorages = Map.copyOf(storageByType);
confRegistry = new ConfigurationRegistry(rootKeys, validators, configurationStorages);
}
@@ -78,11 +88,10 @@ import org.apache.ignite.configuration.validation.Validator;
* @throws InterruptedException If thread is interrupted during bootstrap.
* @throws ExecutionException If configuration update failed for some reason.
*/
- public void bootstrap(String jsonStr) throws InterruptedException, ExecutionException {
+ public void bootstrap(String jsonStr, ConfigurationType type) throws InterruptedException, ExecutionException {
JsonObject jsonCfg = JsonParser.parseString(jsonStr).getAsJsonObject();
- for (ConfigurationStorage configurationStorage : configurationStorages)
- confRegistry.change(JsonConverter.jsonSource(jsonCfg), configurationStorage).get();
+ confRegistry.change(JsonConverter.jsonSource(jsonCfg), configurationStorages.get(type)).get();
}
/**
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index eb65743..d50551a 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -104,7 +104,7 @@ public class IgnitionImpl implements Ignition {
if (!cfgBootstrappedFromPds && jsonStrBootstrapCfg != null)
try {
- locConfigurationMgr.bootstrap(jsonStrBootstrapCfg);
+ locConfigurationMgr.bootstrap(jsonStrBootstrapCfg, ConfigurationType.LOCAL);
}
catch (Exception e) {
LOG.warn("Unable to parse user specific configuration, default configuration will be used: " + e.getMessage());
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index a9a22bd..43b8701 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -97,6 +97,12 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-network</artifactId>
<type>test-jar</type>
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 9b0f6b6..b04ac1b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.apache.ignite.configuration.internal.ConfigurationManager;
@@ -331,9 +332,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return true;
});
- configurationMgr.configurationRegistry()
- .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
- change.create(name, tableInitChange));
+ try {
+ configurationMgr.configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
+ change.create(name, tableInitChange)).get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Table wasn't created [name=" + name + ']', e);
+
+ tblFut.completeExceptionally(e);
+ }
return tblFut.join();
}
@@ -363,11 +371,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
});
- configurationMgr
- .configurationRegistry()
- .getConfiguration(TablesConfiguration.KEY)
- .tables()
- .change(change -> change.delete(name));
+ try {
+ configurationMgr
+ .configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY)
+ .tables()
+ .change(change -> change.delete(name)).get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Table wasn't dropped [name=" + name + ']', e);
+
+ dropTblFut.completeExceptionally(e);
+ }
dropTblFut.join();
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
new file mode 100644
index 0000000..266fb10
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiPredicate;
+import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
+import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.storage.ConfigurationType;
+import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.internal.affinity.event.AffinityEvent;
+import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.event.SchemaEvent;
+import org.apache.ignite.internal.schema.event.SchemaEventParameters;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.client.Condition;
+import org.apache.ignite.metastorage.client.Operation;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.platform.commons.util.ReflectionUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests scenarios for table manager.
+ */
+public class TableManagerTest {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(TableManagerTest.class);
+
+ /** Internal prefix for the metasorage. */
+ private static final String INTERNAL_PREFIX = "internal.tables.";
+
+ /** The name of the table which is statically configured. */
+ private static final String STATIC_TABLE_NAME = "t1";
+
+ /** The name of the table which will be configured dynamically. */
+ private static final String DYNAMIC_TABLE_NAME = "t2";
+
+ /** The name of table to drop it. */
+ private static final String DYNAMIC_TABLE_FOR_DROP_NAME = "t3";
+
+ /** Table partitions. */
+ public static final int PARTITIONS = 32;
+
+ /** Node port. */
+ public static final int PORT = 2245;
+
+ /** Node name. */
+ public static final String NODE_NAME = "node1";
+
+ /** Configuration manager. */
+ private ConfigurationManager cfrMgr;
+
+ /** Before all test scenarios. */
+ @BeforeEach
+ private void before() {
+ try {
+ cfrMgr = new ConfigurationManager(rootConfigurationKeys(), Arrays.asList(
+ new TestConfigurationStorage(ConfigurationType.LOCAL),
+ new TestConfigurationStorage(ConfigurationType.DISTRIBUTED)));
+
+ cfrMgr.bootstrap("{\n" +
+ " \"node\":{\n" +
+ " \"name\":\"node1\",\n" +
+ " \"metastorageNodes\":[\n" +
+ " \"" + NODE_NAME + "\"\n" +
+ " ]\n" +
+ " }\n" +
+ "}", ConfigurationType.LOCAL);
+
+ cfrMgr.bootstrap("{\n" +
+ " \"cluster\":{\n" +
+ " \"metastorageNodes\":[\n" +
+ " \"" + NODE_NAME + "\"\n" +
+ " ]\n" +
+ "},\n" +
+ " \"table\":{\n" +
+ " \"tables\":{\n" +
+ " \"" + STATIC_TABLE_NAME + "\":{\n" +
+ " \"name\":\"TestTable\",\n" +
+ " \"partitions\":16,\n" +
+ " \"replicas\":1,\n" +
+ " \"columns\":{\n" +
+ " \"id\":{\n" +
+ " \"name\":\"id\",\n" +
+ " \"type\":{\n" +
+ " \"type\":\"Int64\"\n" +
+ " },\n" +
+ " \"nullable\":false\n" +
+ " }\n" +
+ " },\n" +
+ " \"indices\":{\n" +
+ " \"pk\":{\n" +
+ " \"name\":\"pk\",\n" +
+ " \"type\":\"primary\",\n" +
+ " \"uniq\":true,\n" +
+ " \"columns\":{\n" +
+ " \"id\":{\n" +
+ " \"name\":\"id\",\n" +
+ " \"asc\":true\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}", ConfigurationType.DISTRIBUTED);
+ }
+ catch (Exception e) {
+ LOG.error("Failed to bootstrap the test configuration manager.", e);
+
+ fail("Failed to configure manager [err=" + e.getMessage() + ']');
+ }
+ }
+
+ /**
+ * Tests a table which was defined before start through bootstrap configuration.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-14578")
+ @Test
+ public void testStaticTableConfigured() {
+ MetaStorageManager mm = mock(MetaStorageManager.class);
+ SchemaManager sm = mock(SchemaManager.class);
+ AffinityManager am = mock(AffinityManager.class);
+ Loza rm = mock(Loza.class);
+ VaultManager vm = mock(VaultManager.class);
+
+ TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, vm);
+
+ assertEquals(1, tableManager.tables().size());
+
+ assertNotNull(tableManager.table(STATIC_TABLE_NAME));
+ }
+
+ /**
+ * Tests create a table through public API.
+ */
+ @Test
+ public void testCreateTable() {
+ MetaStorageManager mm = mock(MetaStorageManager.class);
+ SchemaManager sm = mock(SchemaManager.class);
+ AffinityManager am = mock(AffinityManager.class);
+ Loza rm = mock(Loza.class);
+ VaultManager vm = mock(VaultManager.class);
+
+ ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
+
+ CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
+
+ TableManager tableManager = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, mm, sm, am, rm, vm, node, tblIdFut);
+
+ assertNotNull(tableManager.table(DYNAMIC_TABLE_NAME));
+ }
+
+ /**
+ * Tests drop a table through public API.
+ */
+ @Test
+ public void testDropTable() {
+ MetaStorageManager mm = mock(MetaStorageManager.class);
+ SchemaManager sm = mock(SchemaManager.class);
+ AffinityManager am = mock(AffinityManager.class);
+ Loza rm = mock(Loza.class);
+ VaultManager vm = mock(VaultManager.class);
+
+ ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
+
+ CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
+
+ TableManager tableManager = mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, mm, sm, am, rm, vm, node, tblIdFut);
+
+ assertNotNull(tableManager.table(DYNAMIC_TABLE_FOR_DROP_NAME));
+
+ when(sm.unregisterSchemas(any())).thenReturn(CompletableFuture.completedFuture(true));
+
+ doAnswer(invokation -> {
+ BiPredicate<SchemaEventParameters, Exception> schemaInitialized = invokation.getArgument(1);
+
+ assertTrue(tblIdFut.isDone());
+
+ SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
+
+ CompletableFuture.supplyAsync(() -> schemaInitialized.test(
+ new SchemaEventParameters(tblIdFut.join(), schemaRegistry),
+ null));
+
+ return null;
+ }).when(sm).listen(same(SchemaEvent.DROPPED), any());
+
+ when(am.removeAssignment(any())).thenReturn(CompletableFuture.completedFuture(true));
+
+ doAnswer(invokation -> {
+ BiPredicate<AffinityEventParameters, Exception> affinityRemovedDelegate = (BiPredicate)invokation.getArgument(1);
+
+ ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
+
+ for (int part = 0; part < PARTITIONS; part++) {
+ assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
+ }
+
+ assertTrue(tblIdFut.isDone());
+
+ CompletableFuture.supplyAsync(() -> affinityRemovedDelegate.test(
+ new AffinityEventParameters(tblIdFut.join(), assignment),
+ null));
+
+ return null;
+ }).when(am).listen(same(AffinityEvent.REMOVED), any());
+
+ tableManager.dropTable(DYNAMIC_TABLE_FOR_DROP_NAME);
+
+ assertNull(tableManager.table(DYNAMIC_TABLE_FOR_DROP_NAME));
+ }
+
+ /**
+ * Instantiates Table manager and creates a table in it.
+ *
+ * @param tableName Table name.
+ * @param mm Metastorage manager mock.
+ * @param sm Schema manager mock.
+ * @param am Affinity manager mock.
+ * @param rm Raft manager mock.
+ * @param vm Vault manager mock.
+ * @param node This cluster node.
+ * @param tblIdFut Future which will determine a table id.
+ * @return Table manager.
+ */
+ private TableManager mockManagersAndCreateTable(
+ String tableName,
+ MetaStorageManager mm,
+ SchemaManager sm,
+ AffinityManager am,
+ Loza rm,
+ VaultManager vm,
+ ClusterNode node,
+ CompletableFuture<UUID> tblIdFut
+ ) {
+ when(mm.invoke((Condition)any(), (Operation)any(), (Operation)any())).thenAnswer(invokation -> {
+ Condition condition = (Condition)invokation.getArgument(0);
+
+ Object internalCondition = ReflectionUtils.tryToReadFieldValue(Condition.class, "cond", condition).get();
+
+ Method getKeyMethod = ReflectionUtils.findMethod(internalCondition.getClass(), "key").get();
+
+ byte[] metastorageKeyBytes = (byte[])ReflectionUtils.invokeMethod(getKeyMethod, internalCondition);
+
+ tblIdFut.complete(UUID.fromString(new String(metastorageKeyBytes, StandardCharsets.UTF_8).substring(INTERNAL_PREFIX.length())));
+
+ return CompletableFuture.completedFuture(true);
+ });
+
+ when(sm.initSchemaForTable(any(), eq(tableName))).thenReturn(CompletableFuture.completedFuture(true));
+
+ doAnswer(invokation -> {
+ BiPredicate<SchemaEventParameters, Exception> schemaInitialized = invokation.getArgument(1);
+
+ assertTrue(tblIdFut.isDone());
+
+ SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
+
+ CompletableFuture.supplyAsync(() -> schemaInitialized.test(
+ new SchemaEventParameters(tblIdFut.join(), schemaRegistry),
+ null));
+
+ return null;
+ }).when(sm).listen(same(SchemaEvent.INITIALIZED), any());
+
+ when(am.calculateAssignments(any())).thenReturn(CompletableFuture.completedFuture(true));
+
+ doAnswer(invokation -> {
+ BiPredicate<AffinityEventParameters, Exception> affinityClaculatedDelegate = (BiPredicate)invokation.getArgument(1);
+
+ ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
+
+ for (int part = 0; part < PARTITIONS; part++) {
+ assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
+ }
+
+ assertTrue(tblIdFut.isDone());
+
+ CompletableFuture.supplyAsync(() -> affinityClaculatedDelegate.test(
+ new AffinityEventParameters(tblIdFut.join(), assignment),
+ null));
+
+ return null;
+ }).when(am).listen(same(AffinityEvent.CALCULATED), any());
+
+ TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, vm);
+
+ int tablesBeforeCreation = tableManager.tables().size();
+
+ Table tbl2 = tableManager.createTable(tableName, change ->
+ change.changeName(tableName)
+ .changePartitions(PARTITIONS)
+ .changeReplicas(1)
+ .changeColumns(cols -> cols
+ .create("key", c -> c.changeName("key").changeType(t -> t.changeType("INT64")).changeNullable(false))
+ .create("val", c -> c.changeName("val").changeType(t -> t.changeType("INT64")).changeNullable(true))
+ )
+ .changeIndices(idxs -> idxs
+ .create("PK", idx -> idx
+ .changeName("PK")
+ .changeType("PRIMARY")
+ .changeColumns(c -> c
+ .create("key", t -> t.changeName("key").changeAsc(true)))
+ .changeAffinityColumns(new String[] {"key"}))
+ ));
+
+ assertNotNull(tbl2);
+
+ assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
+
+ return tableManager;
+ }
+
+ /**
+ * Gets a list of configuration keys to use in the test scenario.
+ *
+ * @return List of root configuration keys.
+ */
+ private static List<RootKey<?, ?>> rootConfigurationKeys() {
+ return Arrays.asList(
+ NodeConfiguration.KEY,
+ ClusterConfiguration.KEY,
+ TablesConfiguration.KEY
+ );
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TestConfigurationStorage.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TestConfigurationStorage.java
new file mode 100644
index 0000000..e4db9fe
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TestConfigurationStorage.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.storage.ConfigurationStorage;
+import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
+import org.apache.ignite.configuration.storage.ConfigurationType;
+import org.apache.ignite.configuration.storage.Data;
+import org.apache.ignite.configuration.storage.StorageException;
+
+/**
+ * Test configurationstorage.
+ */
+public class TestConfigurationStorage implements ConfigurationStorage {
+ /** Listeners. */
+ private final Set<ConfigurationStorageListener> listeners = new HashSet<>();
+
+ /** Configuration type. */
+ private final ConfigurationType type;
+
+ /**
+ * @param type Configuration type.
+ */
+ public TestConfigurationStorage(ConfigurationType type) {
+ this.type = type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Data readAll() throws StorageException {
+ return new Data(Collections.emptyMap(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
+ for (ConfigurationStorageListener listener : listeners)
+ listener.onEntriesChanged(new Data(newValues, version + 1));
+
+ return CompletableFuture.completedFuture(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addListener(ConfigurationStorageListener listener) {
+ listeners.add(listener);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeListener(ConfigurationStorageListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override public void notifyApplied(long storageRevision) {
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConfigurationType type() {
+ return type;
+ }
+}