You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/09 14:17:15 UTC
[ignite-3] branch ignite-15114 updated: IGNITE-16528 Implement init command handling
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-15114
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-15114 by this push:
new 4d5a520 IGNITE-16528 Implement init command handling
4d5a520 is described below
commit 4d5a520649b5bf9bd9bd24e4779bc1a8370d9e1f
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Wed Mar 9 17:17:07 2022 +0300
IGNITE-16528 Implement init command handling
---
examples/config/ignite-config.json | 5 -
examples/pom.xml | 12 +
.../ignite/example/AbstractExamplesTest.java | 59 +++
.../apache/ignite/example/ExampleTestUtils.java | 2 +-
.../example/sql/jdbc/ItSqlExamplesTest.java} | 54 +--
.../ignite/example/table/ItTableExamplesTest.java} | 54 +--
.../example/tx/ItTransactionsExamplesTest.java | 40 ++
.../example/tx/TransactionsExamplesTest.java | 88 -----
.../src/main/java/org/apache/ignite/Ignition.java | 74 ++--
.../java/org/apache/ignite/IgnitionManager.java | 90 ++---
.../org/apache/ignite/cli/ItConfigCommandTest.java | 83 ++--
.../ignite/client/handler/ItClientHandlerTest.java | 4 +-
.../ignite/client/handler/ClientHandlerModule.java | 16 +-
.../apache/ignite/client/AbstractClientTest.java | 4 +-
modules/cluster-management/pom.xml | 111 ++++++
.../cluster/management/ClusterInitializer.java | 143 +++++++
.../management/ClusterManagementGroupManager.java | 236 +++++++++++
.../cluster/management/CmgRaftGroupListener.java | 62 +++
.../cluster/management/InitException.java} | 23 +-
.../management/messages/CancelInitMessage.java} | 22 +-
.../management/messages/ClusterStateMessage.java} | 19 +-
.../management/messages/CmgInitMessage.java} | 19 +-
.../management/messages/CmgMessageGroup.java | 51 +++
.../management/messages/InitCompleteMessage.java} | 14 +-
.../management/messages/InitErrorMessage.java} | 18 +-
.../cluster/management/rest/InitCommand.java | 51 +++
.../management/rest/InitCommandHandler.java | 98 +++++
.../cluster/management/ClusterInitializerTest.java | 192 +++++++++
.../testframework/BaseIgniteAbstractTest.java | 20 +-
.../metastorage/client/MetaStorageService.java | 1 -
modules/metastorage/pom.xml | 5 +
.../internal/metastorage/MetaStorageManager.java | 432 +++++++--------------
.../MessageDeserializerGenerator.java | 42 +-
.../serialization/MessageSerializerGenerator.java | 28 +-
.../ignite/network/util/ClusterServiceUtils.java | 55 +++
.../apache/ignite/internal/rest/api/Routes.java | 13 +
.../apache/ignite/internal/rest/RestComponent.java | 18 +
modules/runner/pom.xml | 1 -
.../ItDistributedConfigurationPropertiesTest.java | 59 +--
.../ItDistributedConfigurationStorageTest.java | 54 +--
.../runner/app/AbstractSchemaChangeTest.java | 16 +-
.../internal/runner/app/ItBaselineChangesTest.java | 44 +--
.../internal/runner/app/ItClusterInitTest.java | 102 +++++
.../internal/runner/app/ItDataSchemaSyncTest.java | 44 +--
.../runner/app/ItDynamicTableCreationTest.java | 97 ++---
.../runner/app/ItIgniteNodeRestartTest.java | 3 -
.../ignite/internal/runner/app/ItIgnitionTest.java | 100 +----
.../internal/runner/app/ItNoThreadsLeftTest.java | 74 ++--
.../runner/app/ItSchemaChangeKvViewTest.java | 12 +-
.../runner/app/ItSchemaChangeTableViewTest.java | 14 +-
.../internal/runner/app/ItTableCreationTest.java | 3 -
.../internal/runner/app/ItTablesApiTest.java | 148 +++----
.../runner/app/PlatformTestNodeRunner.java | 23 +-
.../app/client/ItAbstractThinClientTest.java | 29 +-
.../runner/app/jdbc/AbstractJdbcSelfTest.java | 16 +-
.../runner/app/jdbc/ItJdbcBatchSelfTest.java | 2 +-
.../sql/engine/AbstractBasicIntegrationTest.java | 22 +-
.../integrationTest/resources/ignite-config.json | 5 -
.../org/apache/ignite/internal/app/IgniteImpl.java | 135 ++++---
.../CoreDistributedConfigurationModule.java | 2 -
.../CoreLocalConfigurationModule.java | 2 -
.../CoreDistributedConfigurationModuleTest.java | 6 -
.../CoreLocalConfigurationModuleTest.java | 6 -
parent/pom.xml | 41 +-
pom.xml | 1 +
65 files changed, 2005 insertions(+), 1314 deletions(-)
diff --git a/examples/config/ignite-config.json b/examples/config/ignite-config.json
index 6060004..8698d5a 100644
--- a/examples/config/ignite-config.json
+++ b/examples/config/ignite-config.json
@@ -1,9 +1,4 @@
{
- "node": {
- "metastorageNodes": [
- "my-first-node"
- ]
- },
"network": {
"port": 3344,
"portRange": 10,
diff --git a/examples/pom.xml b/examples/pom.xml
index 5ed1dba..46dfb6d 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,6 +34,18 @@
<properties>
<root.directory>${pom.basedir}/..</root.directory>
+
+ <argLine>
+ --add-opens java.base/java.lang=ALL-UNNAMED
+ --add-opens java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens java.base/java.io=ALL-UNNAMED
+ --add-opens java.base/java.nio=ALL-UNNAMED
+ --add-opens java.base/java.util=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
+ -Dio.netty.tryReflectionSetAccessible=true
+ -Djava.util.logging.config.file=../config/java.util.logging.properties
+ </argLine>
</properties>
<dependencies>
diff --git a/examples/src/integrationTest/java/org/apache/ignite/example/AbstractExamplesTest.java b/examples/src/integrationTest/java/org/apache/ignite/example/AbstractExamplesTest.java
new file mode 100644
index 0000000..3765410
--- /dev/null
+++ b/examples/src/integrationTest/java/org/apache/ignite/example/AbstractExamplesTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Base class for creating tests for examples.
+ */
+public abstract class AbstractExamplesTest extends IgniteAbstractTest {
+ private static final String TEST_NODE_NAME = "ignite-node";
+
+ /** Empty argument to invoke an example. */
+ protected static final String[] EMPTY_ARGS = new String[0];
+
+ /**
+ * Starts a node.
+ */
+ @BeforeEach
+ public void startNode() throws Exception {
+ IgniteImpl ignite = (IgniteImpl) IgnitionManager.start(
+ TEST_NODE_NAME,
+ Path.of("config", "ignite-config.json"),
+ workDir,
+ null
+ );
+
+ ignite.init(List.of(ignite.name()));
+ }
+
+ /**
+ * Stops the node.
+ */
+ @AfterEach
+ public void stopNode() {
+ IgnitionManager.stop(TEST_NODE_NAME);
+ }
+}
diff --git a/examples/src/test/java/org/apache/ignite/example/ExampleTestUtils.java b/examples/src/integrationTest/java/org/apache/ignite/example/ExampleTestUtils.java
similarity index 97%
rename from examples/src/test/java/org/apache/ignite/example/ExampleTestUtils.java
rename to examples/src/integrationTest/java/org/apache/ignite/example/ExampleTestUtils.java
index ec08af6..b8b4bb2 100644
--- a/examples/src/test/java/org/apache/ignite/example/ExampleTestUtils.java
+++ b/examples/src/integrationTest/java/org/apache/ignite/example/ExampleTestUtils.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
diff --git a/examples/src/test/java/org/apache/ignite/example/sql/jdbc/SqlExamplesTest.java b/examples/src/integrationTest/java/org/apache/ignite/example/sql/jdbc/ItSqlExamplesTest.java
similarity index 55%
rename from examples/src/test/java/org/apache/ignite/example/sql/jdbc/SqlExamplesTest.java
rename to examples/src/integrationTest/java/org/apache/ignite/example/sql/jdbc/ItSqlExamplesTest.java
index f6d9440..54cdecb 100644
--- a/examples/src/test/java/org/apache/ignite/example/sql/jdbc/SqlExamplesTest.java
+++ b/examples/src/integrationTest/java/org/apache/ignite/example/sql/jdbc/ItSqlExamplesTest.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -17,27 +17,14 @@
package org.apache.ignite.example.sql.jdbc;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.example.AbstractExamplesTest;
import org.apache.ignite.example.ExampleTestUtils;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
/**
* These tests check that all SQL JDBC examples pass correctly.
*/
-@ExtendWith(WorkDirectoryExtension.class)
-public class SqlExamplesTest {
- /** Empty argument to invoke an example. */
- protected static final String[] EMPTY_ARGS = new String[0];
-
+public class ItSqlExamplesTest extends AbstractExamplesTest {
/**
* Runs SqlJdbcExample and checks its output.
*
@@ -62,39 +49,4 @@ public class SqlExamplesTest {
+ " Richard, Miles, St. Petersburg\n"
);
}
-
- /**
- * Start node.
- *
- * @param workDir Work directory for the started node. Must not be {@code null}.
- */
- @BeforeEach
- public void startNode(@WorkDirectory Path workDir) throws IOException {
- IgnitionManager.start(
- "my-first-node",
- Files.readString(Path.of("config", "ignite-config.json")),
- workDir
- );
- }
-
- /**
- * Stop node.
- */
- @AfterEach
- public void stopNode() {
- IgnitionManager.stop("my-first-node");
- }
-
- /**
- * Removes a work directory created by {@link SqlExamplesTest}.
- */
- @BeforeEach
- @AfterEach
- public void removeWorkDir() {
- Path workDir = Path.of("work");
-
- if (Files.exists(workDir)) {
- IgniteUtils.deleteIfExists(workDir);
- }
- }
}
diff --git a/examples/src/test/java/org/apache/ignite/example/table/TableExamplesTest.java b/examples/src/integrationTest/java/org/apache/ignite/example/table/ItTableExamplesTest.java
similarity index 65%
rename from examples/src/test/java/org/apache/ignite/example/table/TableExamplesTest.java
rename to examples/src/integrationTest/java/org/apache/ignite/example/table/ItTableExamplesTest.java
index 5c179b5..5292b25 100644
--- a/examples/src/test/java/org/apache/ignite/example/table/TableExamplesTest.java
+++ b/examples/src/integrationTest/java/org/apache/ignite/example/table/ItTableExamplesTest.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -17,27 +17,14 @@
package org.apache.ignite.example.table;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.example.AbstractExamplesTest;
import org.apache.ignite.example.ExampleTestUtils;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
/**
* These tests check that all table examples pass correctly.
*/
-@ExtendWith(WorkDirectoryExtension.class)
-public class TableExamplesTest {
- /** Empty argument to invoke an example. */
- protected static final String[] EMPTY_ARGS = new String[0];
-
+public class ItTableExamplesTest extends AbstractExamplesTest {
/**
* Runs RecordViewExample.
*
@@ -93,39 +80,4 @@ public class TableExamplesTest {
+ " Owner: Val Kulichenko\n"
+ " Balance: $100.0\n");
}
-
- /**
- * Start node.
- *
- * @param workDir Work directory for the started node. Must not be {@code null}.
- */
- @BeforeEach
- public void startNode(@WorkDirectory Path workDir) throws IOException {
- IgnitionManager.start(
- "my-first-node",
- Files.readString(Path.of("config", "ignite-config.json")),
- workDir
- );
- }
-
- /**
- * Stop node.
- */
- @AfterEach
- public void stopNode() {
- IgnitionManager.stop("my-first-node");
- }
-
- /**
- * Removes a previously created work directory.
- */
- @BeforeEach
- @AfterEach
- public void removeWorkDir() {
- Path workDir = Path.of("work");
-
- if (Files.exists(workDir)) {
- IgniteUtils.deleteIfExists(workDir);
- }
- }
}
diff --git a/examples/src/integrationTest/java/org/apache/ignite/example/tx/ItTransactionsExamplesTest.java b/examples/src/integrationTest/java/org/apache/ignite/example/tx/ItTransactionsExamplesTest.java
new file mode 100644
index 0000000..c6b3f2b
--- /dev/null
+++ b/examples/src/integrationTest/java/org/apache/ignite/example/tx/ItTransactionsExamplesTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example.tx;
+
+import org.apache.ignite.example.AbstractExamplesTest;
+import org.apache.ignite.example.ExampleTestUtils;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for transactional examples.
+ */
+public class ItTransactionsExamplesTest extends AbstractExamplesTest {
+ /**
+ * Runs TransactionsExample and checks its output.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTransactionsExample() throws Exception {
+ ExampleTestUtils.assertConsoleOutputContains(TransactionsExample::main, EMPTY_ARGS,
+ "Initial balance: 1000.0",
+ "Balance after the sync transaction: 1200.0",
+ "Balance after the async transaction: 1500.0");
+ }
+}
diff --git a/examples/src/test/java/org/apache/ignite/example/tx/TransactionsExamplesTest.java b/examples/src/test/java/org/apache/ignite/example/tx/TransactionsExamplesTest.java
deleted file mode 100644
index 8a59175..0000000
--- a/examples/src/test/java/org/apache/ignite/example/tx/TransactionsExamplesTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.example.tx;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.ignite.IgnitionManager;
-import org.apache.ignite.example.ExampleTestUtils;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-/**
- * Tests for transactional examples.
- */
-@ExtendWith(WorkDirectoryExtension.class)
-public class TransactionsExamplesTest {
- /** Empty argument to invoke an example. */
- protected static final String[] EMPTY_ARGS = new String[0];
-
- /**
- * Runs TransactionsExample and checks its output.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testTransactionsExample() throws Exception {
- ExampleTestUtils.assertConsoleOutputContains(TransactionsExample::main, EMPTY_ARGS,
- "Initial balance: 1000.0",
- "Balance after the sync transaction: 1200.0",
- "Balance after the async transaction: 1500.0");
- }
-
- /**
- * Start node.
- *
- * @param workDir Work directory for the started node. Must not be {@code null}.
- */
- @BeforeEach
- public void startNode(@WorkDirectory Path workDir) throws IOException {
- IgnitionManager.start(
- "my-first-node",
- Files.readString(Path.of("config", "ignite-config.json")),
- workDir
- );
- }
-
- /**
- * Stop node.
- */
- @AfterEach
- public void stopNode() {
- IgnitionManager.stop("my-first-node");
- }
-
- /**
- * Removes a previously created work directory.
- */
- @BeforeEach
- @AfterEach
- public void removeWorkDir() {
- Path workDir = Path.of("work");
-
- if (Files.exists(workDir)) {
- IgniteUtils.deleteIfExists(workDir);
- }
- }
-}
diff --git a/modules/api/src/main/java/org/apache/ignite/Ignition.java b/modules/api/src/main/java/org/apache/ignite/Ignition.java
index 4824fb6..0585353 100644
--- a/modules/api/src/main/java/org/apache/ignite/Ignition.java
+++ b/modules/api/src/main/java/org/apache/ignite/Ignition.java
@@ -20,7 +20,6 @@ package org.apache.ignite;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Path;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -31,70 +30,69 @@ public interface Ignition {
/**
* Starts an Ignite node with an optional bootstrap configuration from a HOCON file.
*
- * @param name Name of the node. Must not be {@code null}.
+ * @param name Name of the node. Must not be {@code null}.
* @param configPath Path to the node configuration in the HOCON format. Can be {@code null}.
- * @param workDir Work directory for the started node. Must not be {@code null}.
+ * @param workDir Work directory for the started node. Must not be {@code null}.
* @return Started Ignite node.
*/
- public Ignite start(@NotNull String name, @Nullable Path configPath, @NotNull Path workDir);
+ public Ignite start(String name, @Nullable Path configPath, Path workDir);
/**
- * Starts an Ignite node with an optional bootstrap configuration from a HOCON file, with an optional
- * class loader for further usage by {@link java.util.ServiceLoader}.
+ * Starts an Ignite node with an optional bootstrap configuration from a HOCON file, with an optional class loader for further usage by
+ * {@link java.util.ServiceLoader}.
*
- * @param name Name of the node. Must not be {@code null}.
- * @param configPath Path to the node configuration in the HOCON format. Can be {@code null}.
- * @param workDir Work directory for the started node. Must not be {@code null}.
- * @param serviceLoaderClassLoader The class loader to be used to load provider-configuration files and provider
- * classes, or {@code null} if the system
- * class loader (or, failing that, the bootstrap class loader) is to be used
+ * @param name Name of the node. Must not be {@code null}.
+ * @param configPath Path to the node configuration in the HOCON format. Can be {@code null}.
+ * @param workDir Work directory for the started node. Must not be {@code null}.
+ * @param serviceLoaderClassLoader The class loader to be used to load provider-configuration files and provider classes, or {@code
+ * null} if the system class loader (or, failing that, the bootstrap class loader) is to be used
* @return Started Ignite node.
*/
- public Ignite start(@NotNull String name, @Nullable Path configPath, @NotNull Path workDir,
- @Nullable ClassLoader serviceLoaderClassLoader);
+ public Ignite start(String name, @Nullable Path configPath, Path workDir, @Nullable ClassLoader serviceLoaderClassLoader);
/**
* Starts an Ignite node with an optional bootstrap configuration from a URL linking to HOCON configs.
*
- * @param name Name of the node. Must not be {@code null}.
- * @param cfgUrl URL linking to the node configuration in the HOCON format. Can be {@code null}.
- * @param workDir Work directory for the started node. Must not be {@code null}.
+ * @param name Name of the node. Must not be {@code null}.
+ * @param cfgUrl URL linking to the node configuration in the HOCON format. Can be {@code null}.
+ * @param workDir Work directory for the started node. Must not be {@code null}.
* @return Started Ignite node.
*/
- public Ignite start(@NotNull String name, @Nullable URL cfgUrl, @NotNull Path workDir);
+ public Ignite start(String name, @Nullable URL cfgUrl, Path workDir);
/**
* Starts an Ignite node with an optional bootstrap configuration from an input stream with HOCON configs.
*
- * @param name Name of the node. Must not be {@code null}.
- * @param config Optional node configuration based on {@link org.apache.ignite.configuration.schemas.runner.NodeConfigurationSchema}
- * and {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}. Following rules are used for
- * applying the configuration properties:
- * <ol>
- * <li>Specified property overrides existing one or just applies itself if it wasn't
- * previously specified.</li>
- * <li>All non-specified properties either use previous value or use default one from
- * corresponding configuration schema.</li>
- * </ol>
- * So that, in case of initial node start (first start ever) specified configuration, supplemented
- * with defaults, is used. If no configuration was provided defaults are used for all
- * configuration properties. In case of node restart, specified properties override existing
- * ones, non specified properties that also weren't specified previously use default values.
- * Please pay attention that previously specified properties are searched in the
- * {@code workDir} specified by the user.
+ * @param name Name of the node. Must not be {@code null}.
+ * @param config Optional node configuration based on
+ * {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}.
+ * Following rules are used for applying the configuration properties:
+ * <ol>
+ * <li>Specified property overrides existing one or just applies itself if it wasn't
+ * previously specified.</li>
+ * <li>All non-specified properties either use previous value or use default one from
+ * corresponding configuration schema.</li>
+ * </ol>
+ * So that, in case of initial node start (first start ever) specified configuration, supplemented
+ * with defaults, is used. If no configuration was provided defaults are used for all
+ * configuration properties. In case of node restart, specified properties override existing
+ * ones, non specified properties that also weren't specified previously use default values.
+ * Please pay attention that previously specified properties are searched in the
+ * {@code workDir} specified by the user.
+ *
* @param workDir Work directory for the started node. Must not be {@code null}.
* @return Started Ignite node.
*/
- public Ignite start(@NotNull String name, @Nullable InputStream config, @NotNull Path workDir);
+ public Ignite start(String name, @Nullable InputStream config, Path workDir);
/**
* Starts an Ignite node with the default configuration.
*
- * @param name Name of the node. Must not be {@code null}.
+ * @param name Name of the node. Must not be {@code null}.
* @param workDir Work directory for the started node. Must not be {@code null}.
* @return Started Ignite node.
*/
- public Ignite start(@NotNull String name, @NotNull Path workDir);
+ public Ignite start(String name, Path workDir);
/**
* Stops the node with given {@code name}. It's possible to stop both already started node or node that is currently starting. Has no
@@ -103,5 +101,5 @@ public interface Ignition {
* @param name Node name to stop.
* @throws IllegalArgumentException if null is specified instead of node name.
*/
- public void stop(@NotNull String name);
+ public void stop(String name);
}
diff --git a/modules/api/src/main/java/org/apache/ignite/IgnitionManager.java b/modules/api/src/main/java/org/apache/ignite/IgnitionManager.java
index 84074f6..1896e5e 100644
--- a/modules/api/src/main/java/org/apache/ignite/IgnitionManager.java
+++ b/modules/api/src/main/java/org/apache/ignite/IgnitionManager.java
@@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ServiceLoader;
import org.apache.ignite.lang.IgniteException;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -36,40 +35,32 @@ public class IgnitionManager {
private static Ignition ignition;
/**
- * Starts an Ignite node with an optional bootstrap configuration from a HOCON file.
+ * Starts an Ignite node with an optional bootstrap configuration from an input stream with HOCON configs.
+ *
+ * @param nodeName Name of the node. Must not be {@code null}.
+ * @param configStr Optional node configuration based on
+ * {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}.
+ * Following rules are used for applying the configuration properties:
+ * <ol>
+ * <li>Specified property overrides existing one or just applies itself if it wasn't
+ * previously specified.</li>
+ * <li>All non-specified properties either use previous value or use default one from
+ * corresponding configuration schema.</li>
+ * </ol>
+ * So that, in case of initial node start (first start ever) specified configuration, supplemented
+ * with defaults, is used. If no configuration was provided defaults are used for all
+ * configuration properties. In case of node restart, specified properties override existing
+ * ones, non specified properties that also weren't specified previously use default values.
+ * Please pay attention that previously specified properties are searched in the
+ * {@code workDir} specified by the user.
*
- * @param nodeName Name of the node. Must not be {@code null}.
- * @param configStr Optional node configuration based on {@link org.apache.ignite.configuration.schemas.runner.NodeConfigurationSchema}
- * and {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}. Following rules are used for
- * applying the configuration properties:
- * <ol>
- * <li>Specified property overrides existing one or just applies itself if it wasn't
- * previously specified.</li>
- * <li>All non-specified properties either use previous value or use default one from
- * corresponding configuration schema.</li>
- * </ol>
- * So that, in case of initial node start (first start ever) specified configuration, supplemented
- * with defaults, is used. If no configuration was provided defaults are used for all
- * configuration properties. In case of node restart, specified properties override existing
- * ones, non specified properties that also weren't specified previously use default values.
- * Please pay attention that previously specified properties are searched in the
- * {@code workDir} specified by the user.
- * @param workDir Work directory for the started node. Must not be {@code null}.
+ * @param workDir Work directory for the started node. Must not be {@code null}.
* @return Started Ignite node.
* @throws IgniteException If error occurs while reading node configuration.
*/
// TODO IGNITE-14580 Add exception handling logic to IgnitionProcessor.
- public static Ignite start(
- @NotNull String nodeName,
- @Nullable String configStr,
- @NotNull Path workDir
- ) {
- synchronized (IgnitionManager.class) {
- if (ignition == null) {
- ServiceLoader<Ignition> ldr = ServiceLoader.load(Ignition.class);
- ignition = ldr.iterator().next();
- }
- }
+ public static Ignite start(String nodeName, @Nullable String configStr, Path workDir) {
+ loadIgnitionService(Thread.currentThread().getContextClassLoader());
if (configStr == null) {
return ignition.start(nodeName, workDir);
@@ -93,18 +84,8 @@ public class IgnitionManager {
* @return Started Ignite node.
*/
// TODO IGNITE-14580 Add exception handling logic to IgnitionProcessor.
- public static Ignite start(
- @NotNull String nodeName,
- @Nullable Path cfgPath,
- @NotNull Path workDir,
- @Nullable ClassLoader clsLdr
- ) {
- synchronized (IgnitionManager.class) {
- if (ignition == null) {
- ServiceLoader<Ignition> ldr = ServiceLoader.load(Ignition.class, clsLdr);
- ignition = ldr.iterator().next();
- }
- }
+ public static Ignite start(String nodeName, @Nullable Path cfgPath, Path workDir, @Nullable ClassLoader clsLdr) {
+ loadIgnitionService(clsLdr);
return ignition.start(nodeName, cfgPath, workDir, clsLdr);
}
@@ -116,13 +97,8 @@ public class IgnitionManager {
* @param name Node name to stop.
* @throws IllegalArgumentException if null is specified instead of node name.
*/
- public static void stop(@NotNull String name) {
- synchronized (IgnitionManager.class) {
- if (ignition == null) {
- ServiceLoader<Ignition> ldr = ServiceLoader.load(Ignition.class);
- ignition = ldr.iterator().next();
- }
- }
+ public static void stop(String name) {
+ loadIgnitionService(Thread.currentThread().getContextClassLoader());
ignition.stop(name);
}
@@ -136,14 +112,16 @@ public class IgnitionManager {
* class loader (or, failing that, the bootstrap class loader) is to be used
* @throws IllegalArgumentException if null is specified instead of node name.
*/
- public static void stop(@NotNull String name, @Nullable ClassLoader clsLdr) {
- synchronized (IgnitionManager.class) {
- if (ignition == null) {
- ServiceLoader<Ignition> ldr = ServiceLoader.load(Ignition.class, clsLdr);
- ignition = ldr.iterator().next();
- }
- }
+ public static void stop(String name, @Nullable ClassLoader clsLdr) {
+ loadIgnitionService(clsLdr);
ignition.stop(name);
}
+
+ private static synchronized void loadIgnitionService(@Nullable ClassLoader clsLdr) {
+ if (ignition == null) {
+ ServiceLoader<Ignition> ldr = ServiceLoader.load(Ignition.class, clsLdr);
+ ignition = ldr.iterator().next();
+ }
+ }
}
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java
index 0edb906..d594e05 100644
--- a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java
@@ -32,74 +32,50 @@ import com.jayway.jsonpath.JsonPath;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.PrintWriter;
-import java.net.ServerSocket;
import java.nio.file.Path;
import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;
-import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.cli.spec.IgniteCliSpec;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.extension.ExtendWith;
import picocli.CommandLine;
/**
* Integration test for {@code ignite config} commands.
*/
+@ExtendWith(WorkDirectoryExtension.class)
public class ItConfigCommandTest extends AbstractCliTest {
/** DI context. */
private ApplicationContext ctx;
/** stderr. */
- private ByteArrayOutputStream err;
+ private final ByteArrayOutputStream err = new ByteArrayOutputStream();
/** stdout. */
- private ByteArrayOutputStream out;
-
- /** Port for REST communication. */
- private int restPort;
-
- /** Port for thin client communication. */
- private int clientPort;
-
- /** Network port. */
- private int networkPort;
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream();
/** Node. */
- private Ignite node;
+ private IgniteImpl node;
@BeforeEach
- void setup(@TempDir Path workDir, TestInfo testInfo) throws IOException {
- // TODO: IGNITE-15131 Must be replaced by receiving the actual port configs from the started node.
- // This approach still can produce the port, which will be unavailable at the moment of node start.
- restPort = getAvailablePort();
- clientPort = getAvailablePort();
- networkPort = getAvailablePort();
-
- String configStr = String.join("\n",
- "network.port=" + networkPort,
- "rest.port=" + restPort,
- "rest.portRange=0",
- "clientConnector.port=" + clientPort,
- "clientConnector.portRange=0"
- );
-
- this.node = IgnitionManager.start(testNodeName(testInfo, networkPort), configStr, workDir);
+ void setup(@WorkDirectory Path workDir, TestInfo testInfo) {
+ node = (IgniteImpl) IgnitionManager.start(testNodeName(testInfo, 0), null, workDir);
ctx = ApplicationContext.run(Environment.TEST);
-
- err = new ByteArrayOutputStream();
- out = new ByteArrayOutputStream();
}
@AfterEach
void tearDown(TestInfo testInfo) {
- IgnitionManager.stop(testNodeName(testInfo, networkPort));
+ node.stop();
+
ctx.stop();
}
@@ -109,9 +85,9 @@ public class ItConfigCommandTest extends AbstractCliTest {
"config",
"set",
"--node-endpoint",
- "localhost:" + restPort,
+ "localhost:" + node.restAddress().port(),
"--type", "node", //TODO: Fix in https://issues.apache.org/jira/browse/IGNITE-15306
- "node.metastorageNodes=[\"localhost1\"]"
+ "network.shutdownQuietPeriod=1"
);
String nl = System.lineSeparator();
@@ -129,7 +105,7 @@ public class ItConfigCommandTest extends AbstractCliTest {
"config",
"get",
"--node-endpoint",
- "localhost:" + restPort,
+ "localhost:" + node.restAddress().port(),
"--type", "node" //TODO: Fix in https://issues.apache.org/jira/browse/IGNITE-15306
);
@@ -137,7 +113,7 @@ public class ItConfigCommandTest extends AbstractCliTest {
DocumentContext document = JsonPath.parse(removeTrailingQuotes(unescapeQuotes(out.toString(UTF_8))));
- assertEquals("localhost1", document.read("$.node.metastorageNodes[0]"));
+ assertEquals(1, document.read("$.network.shutdownQuietPeriod", Integer.class));
}
@Test
@@ -146,16 +122,16 @@ public class ItConfigCommandTest extends AbstractCliTest {
"config",
"set",
"--node-endpoint",
- "localhost:" + restPort,
+ "localhost:" + node.restAddress().port(),
"--type", "node", //TODO: Fix in https://issues.apache.org/jira/browse/IGNITE-15306
- "node.metastorgeNodes=[\"localhost1\"]"
+ "network.foo=\"bar\""
);
assertEquals(1, exitCode);
assertThat(
err.toString(UTF_8),
both(startsWith("org.apache.ignite.cli.IgniteCliException: Failed to set configuration"))
- .and(containsString("'node' configuration doesn't have the 'metastorgeNodes' sub-configuration"))
+ .and(containsString("'network' configuration doesn't have the 'foo' sub-configuration"))
);
resetStreams();
@@ -164,16 +140,16 @@ public class ItConfigCommandTest extends AbstractCliTest {
"config",
"set",
"--node-endpoint",
- "localhost:" + restPort,
+ "localhost:" + node.restAddress().port(),
"--type", "node", //TODO: Fix in https://issues.apache.org/jira/browse/IGNITE-15306
- "node.metastorageNodes=abc"
+ "network.shutdownQuietPeriod=abc"
);
assertEquals(1, exitCode);
assertThat(
err.toString(UTF_8),
both(startsWith("org.apache.ignite.cli.IgniteCliException: Failed to set configuration"))
- .and(containsString("'String[]' is expected as a type for the 'node.metastorageNodes' configuration value"))
+ .and(containsString("'long' is expected as a type for the 'network.shutdownQuietPeriod' configuration value"))
);
}
@@ -183,7 +159,7 @@ public class ItConfigCommandTest extends AbstractCliTest {
"config",
"get",
"--node-endpoint",
- "localhost:" + restPort,
+ "localhost:" + node.restAddress().port(),
"--selector",
"network",
"--type", "node" //TODO: Fix in https://issues.apache.org/jira/browse/IGNITE-15306
@@ -199,19 +175,6 @@ public class ItConfigCommandTest extends AbstractCliTest {
}
/**
- * Returns any available prt.
- *
- * @return Any available port.
- * @throws IOException if can't allocate port to open socket.
- */
- // TODO: Must be removed after IGNITE-15131.
- private static int getAvailablePort() throws IOException {
- ServerSocket s = new ServerSocket(0);
- s.close();
- return s.getLocalPort();
- }
-
- /**
* Creates a new command line interpreter.
*
* @param applicationCtx DI context.
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 93fe8db..38eef11 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -25,11 +25,9 @@ import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -62,7 +60,7 @@ public class ItClientHandlerTest {
@BeforeEach
public void setUp(TestInfo testInfo) {
serverModule = startServer(testInfo);
- serverPort = ((InetSocketAddress) Objects.requireNonNull(serverModule.localAddress())).getPort();
+ serverPort = serverModule.localAddress().getPort();
}
@AfterEach
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 41c2c9e..f2893a3 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -23,18 +23,18 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import java.net.BindException;
-import java.net.SocketAddress;
+import java.net.InetSocketAddress;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
-import org.jetbrains.annotations.Nullable;
/**
* Client handler module maintains TCP endpoint for thin client connections.
@@ -115,11 +115,15 @@ public class ClientHandlerModule implements IgniteComponent {
/**
* Returns the local address where this handler is bound to.
*
- * @return the local address of this module, or {@code null} if this module is not started.
+ * @return the local address of this module.
+ * @throws IgniteInternalException if the module is not started.
*/
- @Nullable
- public SocketAddress localAddress() {
- return channel == null ? null : channel.localAddress();
+ public InetSocketAddress localAddress() {
+ if (channel == null) {
+ throw new IgniteInternalException("ClientHandlerModule has not been started");
+ }
+
+ return (InetSocketAddress) channel.localAddress();
}
/**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 673516c..1f8fa05 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -21,8 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import io.netty.util.ResourceLeakDetector;
-import java.net.InetSocketAddress;
-import java.util.Objects;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.handler.ClientHandlerModule;
@@ -139,6 +137,6 @@ public abstract class AbstractClientTest {
}
public static int getPort(ClientHandlerModule hnd) {
- return ((InetSocketAddress) Objects.requireNonNull(hnd.localAddress())).getPort();
+ return hnd.localAddress().getPort();
}
}
diff --git a/modules/cluster-management/pom.xml b/modules/cluster-management/pom.xml
new file mode 100644
index 0000000..b38061f
--- /dev/null
+++ b/modules/cluster-management/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-cluster-management</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-raft</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-rest</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterInitializer.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterInitializer.java
new file mode 100644
index 0000000..4c03d0d
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterInitializer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static org.apache.ignite.network.util.ClusterServiceUtils.resolveNodes;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.cluster.management.messages.CancelInitMessage;
+import org.apache.ignite.internal.cluster.management.messages.CmgInitMessage;
+import org.apache.ignite.internal.cluster.management.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.cluster.management.messages.InitCompleteMessage;
+import org.apache.ignite.internal.cluster.management.messages.InitErrorMessage;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkMessage;
+
+/**
+ * Class for performing cluster initialization.
+ */
+public class ClusterInitializer {
+ private static final IgniteLogger log = IgniteLogger.forClass(ClusterInitializer.class);
+
+ private final ClusterService clusterService;
+
+ private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
+
+ /** Constructor. */
+ ClusterInitializer(ClusterService clusterService) {
+ this.clusterService = clusterService;
+ }
+
+ /**
+ * Initializes the cluster that this node is present in.
+ *
+ * @param metaStorageNodeNames names of nodes that will host the Meta Storage. Cannot be empty.
+ * @param cmgNodeNames names of nodes that will host the Cluster Management Group. Can be empty, in which case {@code
+ * metaStorageNodeNames} will be used instead.
+ * @return future that resolves into leader node IDs if completed successfully.
+ */
+ public CompletableFuture<Void> initCluster(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames) {
+ try {
+ if (metaStorageNodeNames.isEmpty()) {
+ throw new IllegalArgumentException("List of metastorage nodes must not be empty");
+ }
+
+ cmgNodeNames = cmgNodeNames.isEmpty() ? metaStorageNodeNames : cmgNodeNames;
+
+ // check that provided Meta Storage nodes are present in the topology
+ resolveNodes(clusterService, metaStorageNodeNames);
+
+ List<ClusterNode> cmgNodes = resolveNodes(clusterService, cmgNodeNames);
+
+ CmgInitMessage initMessage = msgFactory.cmgInitMessage()
+ .metaStorageNodes(metaStorageNodeNames)
+ .cmgNodes(cmgNodeNames)
+ .build();
+
+ return invokeMessage(cmgNodes, initMessage)
+ .thenApply(CompletableFuture::completedFuture)
+ .exceptionally(e -> cancelInit(cmgNodes, e))
+ .thenCompose(Function.identity());
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> cancelInit(Collection<ClusterNode> nodes, Throwable e) {
+ log.error("Initialization failed, rolling back", e);
+
+ CancelInitMessage cancelMessage = msgFactory.cancelInitMessage()
+ .reason(e.getMessage())
+ .build();
+
+ return sendMessage(nodes, cancelMessage)
+ .exceptionally(nestedEx -> {
+ log.error("Error when canceling init", nestedEx);
+
+ e.addSuppressed(nestedEx);
+
+ return null;
+ })
+ .thenCompose(v -> CompletableFuture.failedFuture(e));
+ }
+
+ /**
+ * Sends a message to all provided nodes.
+ *
+ * @param nodes nodes to send message to.
+ * @param message message to send.
+ * @return future that either resolves to a leader node ID or fails if any of the nodes return an error response.
+ */
+ private CompletableFuture<Void> invokeMessage(Collection<ClusterNode> nodes, NetworkMessage message) {
+ return allOf(nodes, node ->
+ clusterService.messagingService()
+ .invoke(node, message, 10000)
+ .thenAccept(response -> {
+ if (response instanceof InitErrorMessage) {
+ throw new InitException(String.format(
+ "Got error response from node \"%s\": %s", node.name(), ((InitErrorMessage) response).cause()
+ ));
+ }
+
+ if (!(response instanceof InitCompleteMessage)) {
+ throw new InitException(String.format(
+ "Unexpected response from node \"%s\": %s", node.name(), response.getClass()
+ ));
+ }
+ })
+ );
+ }
+
+ private CompletableFuture<Void> sendMessage(Collection<ClusterNode> nodes, NetworkMessage message) {
+ return allOf(nodes, node -> clusterService.messagingService().send(node, message));
+ }
+
+ private static CompletableFuture<Void> allOf(
+ Collection<ClusterNode> nodes,
+ Function<ClusterNode, CompletableFuture<?>> futureProducer
+ ) {
+ CompletableFuture<?>[] futures = nodes.stream().map(futureProducer).toArray(CompletableFuture[]::new);
+
+ return CompletableFuture.allOf(futures);
+ }
+}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
new file mode 100644
index 0000000..c22788c
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
+import static org.apache.ignite.network.util.ClusterServiceUtils.resolveNodes;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.cluster.management.messages.CancelInitMessage;
+import org.apache.ignite.internal.cluster.management.messages.ClusterStateMessage;
+import org.apache.ignite.internal.cluster.management.messages.CmgInitMessage;
+import org.apache.ignite.internal.cluster.management.messages.CmgMessageGroup;
+import org.apache.ignite.internal.cluster.management.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.cluster.management.rest.InitCommandHandler;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestComponent;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Ignite component responsible for cluster initialization and managing the Cluster Management Raft Group.
+ */
+public class ClusterManagementGroupManager implements IgniteComponent {
+ private static final IgniteLogger log = IgniteLogger.forClass(ClusterManagementGroupManager.class);
+
+ /** CMG Raft group name. */
+ private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
+
+ /** Init REST endpoint path. */
+ private static final String REST_ENDPOINT = "/management/v1/cluster/init";
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ private final ClusterService clusterService;
+
+ private final Loza raftManager;
+
+ private final RestComponent restModule;
+
+ /** Handles cluster initialization flow. */
+ private final ClusterInitializer clusterInitializer;
+
+ private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
+
+ private final CompletableFuture<Collection<String>> metastorageNodes = new CompletableFuture<>();
+
+ /** Constructor. */
+ public ClusterManagementGroupManager(ClusterService clusterService, Loza raftManager, RestComponent restModule) {
+ this.clusterService = clusterService;
+ this.raftManager = raftManager;
+ this.restModule = restModule;
+ this.clusterInitializer = new ClusterInitializer(clusterService);
+
+ MessagingService messagingService = clusterService.messagingService();
+
+ messagingService.addMessageHandler(CmgMessageGroup.class, (msg, addr, correlationId) -> {
+ if (!busyLock.enterBusy()) {
+ if (correlationId != null) {
+ messagingService.respond(addr, errorResponse(msgFactory, new NodeStoppingException()), correlationId);
+ }
+
+ return;
+ }
+
+ try {
+ if (msg instanceof CancelInitMessage) {
+ handleCancelInit((CancelInitMessage) msg);
+ } else if (msg instanceof CmgInitMessage) {
+ assert correlationId != null;
+
+ handleInit((CmgInitMessage) msg, addr, correlationId);
+ } else if (msg instanceof ClusterStateMessage) {
+ handleClusterState((ClusterStateMessage) msg);
+ }
+ } catch (Exception e) {
+ log.error("CMG message handling failed", e);
+
+ if (correlationId != null) {
+ messagingService.respond(addr, errorResponse(msgFactory, e), correlationId);
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ }
+
+ /**
+ * Initializes the cluster that this node is present in.
+ *
+ * @param metaStorageNodeNames names of nodes that will host the Meta Storage.
+ * @param cmgNodeNames names of nodes that will host the Cluster Management Group.
+ */
+ public void initCluster(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ clusterInitializer.initCluster(metaStorageNodeNames, cmgNodeNames).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInternalException("Interrupted while initializing the cluster", e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException("Unable to initialize the cluster", e.getCause());
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private void handleInit(CmgInitMessage msg, NetworkAddress addr, long correlationId) throws NodeStoppingException {
+ List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+
+ raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, CmgRaftGroupListener::new)
+ .whenComplete((service, e) -> {
+ MessagingService messagingService = clusterService.messagingService();
+
+ if (e == null) {
+ ClusterNode leader = getLeader(service);
+
+ ClusterNode thisNode = clusterService.topologyService().localMember();
+
+ messagingService.respond(addr, successResponse(msgFactory), correlationId);
+
+ if (leader.equals(thisNode)) {
+ broadcastClusterState(msg.metaStorageNodes());
+ }
+ } else {
+ messagingService.respond(addr, errorResponse(msgFactory, e), correlationId);
+ }
+ });
+ }
+
+ private void handleCancelInit(CancelInitMessage msg) throws NodeStoppingException {
+ log.info("CMG initialization cancelled, reason: " + msg.reason());
+
+ raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+
+ // TODO: drop the Raft storage as well, https://issues.apache.org/jira/browse/IGNITE-16471
+ }
+
+ private void handleClusterState(ClusterStateMessage msg) {
+ metastorageNodes.complete(msg.metastorageNodes());
+ }
+
+ private static NetworkMessage successResponse(CmgMessagesFactory msgFactory) {
+ log.info("CMG started successfully");
+
+ return msgFactory.initCompleteMessage().build();
+ }
+
+ private void broadcastClusterState(Collection<String> metaStorageNodes) {
+ NetworkMessage clusterStateMsg = msgFactory.clusterStateMessage()
+ .metastorageNodes(metaStorageNodes)
+ .build();
+
+ clusterService.topologyService()
+ .allMembers()
+ .forEach(node -> clusterService.messagingService().send(node, clusterStateMsg));
+ }
+
+ private static NetworkMessage errorResponse(CmgMessagesFactory msgFactory, Throwable e) {
+ log.error("Exception when starting the CMG", e);
+
+ return msgFactory.initErrorMessage()
+ .cause(e.getMessage())
+ .build();
+ }
+
+ private ClusterNode getLeader(RaftGroupService raftService) {
+ Peer leader = raftService.leader();
+
+ assert leader != null;
+
+ ClusterNode leaderNode = clusterService.topologyService().getByAddress(leader.address());
+
+ assert leaderNode != null;
+
+ return leaderNode;
+ }
+
+ @Override
+ public void start() {
+ restModule.registerHandlers(routes ->
+ routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new InitCommandHandler(clusterInitializer))
+ );
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+ }
+
+ public CompletableFuture<Collection<String>> metaStorageNodes() {
+ return metastorageNodes;
+ }
+}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/CmgRaftGroupListener.java
new file mode 100644
index 0000000..1a5f6b2
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/CmgRaftGroupListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.Nullable;
+
+// TODO: implement listener, https://issues.apache.org/jira/browse/IGNITE-16471
+class CmgRaftGroupListener implements RaftGroupListener {
+ @Override
+ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+
+ }
+
+ @Override
+ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+
+ }
+
+ @Override
+ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+
+ }
+
+ @Override
+ public boolean onSnapshotLoad(Path path) {
+ return false;
+ }
+
+ @Override
+ public void onShutdown() {
+
+ }
+
+ @Override
+ public @Nullable CompletableFuture<Void> onBeforeApply(Command command) {
+ return null;
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/NodeConfigurationSchema.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/InitException.java
similarity index 53%
rename from modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/NodeConfigurationSchema.java
rename to modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/InitException.java
index dad6f45..acec838 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/NodeConfigurationSchema.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/InitException.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,18 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.schemas.runner;
+package org.apache.ignite.internal.cluster.management;
-import org.apache.ignite.configuration.annotation.ConfigurationRoot;
-import org.apache.ignite.configuration.annotation.ConfigurationType;
-import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.lang.IgniteInternalException;
/**
- * Local node configuration schema.
+ * Exception thrown when cluster initialization fails for some reason.
*/
-@ConfigurationRoot(rootName = "node", type = ConfigurationType.LOCAL)
-public class NodeConfigurationSchema {
- /** It is a copy of appropriate property from the cluster configuration. */
- @Value(hasDefault = true)
- public final String[] metastorageNodes = new String[0];
+public class InitException extends IgniteInternalException {
+ public InitException(String message) {
+ super(message);
+ }
+
+ public InitException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/ClusterConfigurationSchema.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CancelInitMessage.java
similarity index 51%
rename from modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/ClusterConfigurationSchema.java
rename to modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CancelInitMessage.java
index b5fdbc3..79d4824 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/ClusterConfigurationSchema.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CancelInitMessage.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.schemas.runner;
+package org.apache.ignite.internal.cluster.management.messages;
-import org.apache.ignite.configuration.annotation.ConfigurationRoot;
-import org.apache.ignite.configuration.annotation.ConfigurationType;
-import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Configuration schema for cluster endpoint subtree.
+ * Message signaling that the init process has failed and needs to be aborted.
*/
-@ConfigurationRoot(rootName = "cluster", type = ConfigurationType.DISTRIBUTED)
-public class ClusterConfigurationSchema {
- /** List of unique names of those cluster nodes that will host distributed metastorage instances. */
- @Value(hasDefault = true)
- public String[] metastorageNodes = new String[0];
+@Transferable(CmgMessageGroup.CANCEL_INIT)
+public interface CancelInitMessage extends NetworkMessage {
+ /**
+ * Textual representation of the cause of init failure.
+ */
+ String reason();
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/ClusterStateMessage.java
similarity index 57%
copy from modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
copy to modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/ClusterStateMessage.java
index 2fdef3d..376e91d 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/ClusterStateMessage.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,8 +15,19 @@
* limitations under the License.
*/
+package org.apache.ignite.internal.cluster.management.messages;
+
+import java.util.Collection;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
/**
- * Configuration schemas for Cluster node.
+ * Message for initializing the Meta Storage.
*/
-
-package org.apache.ignite.configuration.schemas.runner;
+@Transferable(CmgMessageGroup.CLUSTER_STATE)
+public interface ClusterStateMessage extends NetworkMessage {
+ /**
+ * Consistent IDs of nodes that host the Meta Storage.
+ */
+ Collection<String> metastorageNodes();
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CmgInitMessage.java
similarity index 56%
copy from modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
copy to modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CmgInitMessage.java
index 2fdef3d..14356b6 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CmgInitMessage.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,8 +15,21 @@
* limitations under the License.
*/
+package org.apache.ignite.internal.cluster.management.messages;
+
+import java.util.Collection;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
/**
- * Configuration schemas for Cluster node.
+ * Message for initializing the Cluster Management Group.
*/
+@Transferable(CmgMessageGroup.CMG_INIT)
+public interface CmgInitMessage extends NetworkMessage {
+ /**
+ * Consistent IDs of nodes that host the CMG.
+ */
+ Collection<String> cmgNodes();
-package org.apache.ignite.configuration.schemas.runner;
+ Collection<String> metaStorageNodes();
+}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CmgMessageGroup.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CmgMessageGroup.java
new file mode 100644
index 0000000..b90423e
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/CmgMessageGroup.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.messages;
+
+import org.apache.ignite.network.annotations.MessageGroup;
+
+/**
+ * Message Group for cluster initialization and CMG management.
+ */
+@MessageGroup(groupType = 6, groupName = "CmgMessages")
+public class CmgMessageGroup {
+ /**
+ * Message type for {@link CmgInitMessage}.
+ */
+ public static final short CMG_INIT = 1;
+
+ /**
+ * Message type for {@link ClusterStateMessage}.
+ */
+ public static final short CLUSTER_STATE = 2;
+
+ /**
+ * Message type for {@link InitCompleteMessage}.
+ */
+ public static final short INIT_COMPLETE = 3;
+
+ /**
+ * Message type for {@link InitErrorMessage}.
+ */
+ public static final short INIT_ERROR = 4;
+
+ /**
+ * Message type for {@link CancelInitMessage}.
+ */
+ public static final short CANCEL_INIT = 5;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/InitCompleteMessage.java
similarity index 64%
copy from modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
copy to modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/InitCompleteMessage.java
index 2fdef3d..d188cea 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/InitCompleteMessage.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,8 +15,14 @@
* limitations under the License.
*/
+package org.apache.ignite.internal.cluster.management.messages;
+
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
/**
- * Configuration schemas for Cluster node.
+ * Successful response for initializing a Raft group.
*/
-
-package org.apache.ignite.configuration.schemas.runner;
+@Transferable(CmgMessageGroup.INIT_COMPLETE)
+public interface InitCompleteMessage extends NetworkMessage {
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/InitErrorMessage.java
similarity index 58%
rename from modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
rename to modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/InitErrorMessage.java
index 2fdef3d..169338c 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/package-info.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/messages/InitErrorMessage.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,8 +15,18 @@
* limitations under the License.
*/
+package org.apache.ignite.internal.cluster.management.messages;
+
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
/**
- * Configuration schemas for Cluster node.
+ * Message that represents an error condition that has occurred during cluster initialization.
*/
-
-package org.apache.ignite.configuration.schemas.runner;
+@Transferable(CmgMessageGroup.INIT_ERROR)
+public interface InitErrorMessage extends NetworkMessage {
+ /**
+ * Text representation of the occurred error.
+ */
+ String cause();
+}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/InitCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/InitCommand.java
new file mode 100644
index 0000000..c841096
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/InitCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * REST command for initializing a cluster.
+ */
+public class InitCommand {
+ private final Collection<String> metaStorageNodes;
+
+ private final Collection<String> cmgNodes;
+
+ @JsonCreator
+ public InitCommand(
+ @JsonProperty("metaStorageNodes") Collection<String> metaStorageNodes,
+ @JsonProperty("cmgNodes") Collection<String> cmgNodes
+ ) {
+ this.metaStorageNodes = List.copyOf(metaStorageNodes);
+ this.cmgNodes = List.copyOf(cmgNodes);
+ }
+
+ @JsonProperty
+ public Collection<String> metaStorageNodes() {
+ return metaStorageNodes;
+ }
+
+ @JsonProperty
+ public Collection<String> cmgNodes() {
+ return cmgNodes;
+ }
+}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/InitCommandHandler.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/InitCommandHandler.java
new file mode 100644
index 0000000..2150e27
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/InitCommandHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.rest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.ignite.internal.cluster.management.ClusterInitializer;
+import org.apache.ignite.internal.rest.api.ErrorResult;
+import org.apache.ignite.internal.rest.api.RequestHandler;
+import org.apache.ignite.internal.rest.api.RestApiHttpRequest;
+import org.apache.ignite.internal.rest.api.RestApiHttpResponse;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * REST handler for the {@link InitCommand}.
+ */
+public class InitCommandHandler implements RequestHandler {
+ private static final IgniteLogger log = IgniteLogger.forClass(InitCommandHandler.class);
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final ClusterInitializer clusterInitializer;
+
+ public InitCommandHandler(ClusterInitializer clusterInitializer) {
+ this.clusterInitializer = clusterInitializer;
+ }
+
+ @Override
+ public CompletableFuture<RestApiHttpResponse> handle(RestApiHttpRequest request, RestApiHttpResponse response) {
+ try {
+ InitCommand command = readContent(request);
+
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Received init command:\n\tMeta Storage nodes: {}\n\tCMG nodes: {}",
+ command.metaStorageNodes(),
+ command.cmgNodes()
+ );
+ }
+
+ return clusterInitializer.initCluster(command.metaStorageNodes(), command.cmgNodes())
+ .thenApply(v -> successResponse(response))
+ .exceptionally(e -> errorResponse(response, e));
+ } catch (Exception e) {
+ return CompletableFuture.completedFuture(errorResponse(response, e));
+ }
+ }
+
+ private InitCommand readContent(RestApiHttpRequest restApiHttpRequest) throws IOException {
+ ByteBuf content = restApiHttpRequest.request().content();
+
+ try (InputStream is = new ByteBufInputStream(content)) {
+ return objectMapper.readValue(is, InitCommand.class);
+ }
+ }
+
+ private static RestApiHttpResponse successResponse(RestApiHttpResponse response) {
+ log.info("Init command executed successfully");
+
+ return response;
+ }
+
+ private static RestApiHttpResponse errorResponse(RestApiHttpResponse response, Throwable e) {
+ if (e instanceof CompletionException) {
+ e = e.getCause();
+ }
+
+ log.error("Init command failure", e);
+
+ response.status(INTERNAL_SERVER_ERROR);
+ response.json(Map.of("error", new ErrorResult("APPLICATION_EXCEPTION", e.getMessage())));
+
+ return response;
+ }
+}
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterInitializerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterInitializerTest.java
new file mode 100644
index 0000000..c3319a9
--- /dev/null
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterInitializerTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.cluster.management.messages.CancelInitMessage;
+import org.apache.ignite.internal.cluster.management.messages.CmgInitMessage;
+import org.apache.ignite.internal.cluster.management.messages.CmgMessagesFactory;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Tests for {@link ClusterInitializer}.
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class ClusterInitializerTest {
+ @Mock
+ private MessagingService messagingService;
+
+ @Mock
+ private TopologyService topologyService;
+
+ private ClusterInitializer clusterInitializer;
+
+ private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
+
+ @BeforeEach
+ void setUp(@Mock ClusterService clusterService) {
+ when(clusterService.messagingService()).thenReturn(messagingService);
+ when(clusterService.topologyService()).thenReturn(topologyService);
+
+ clusterInitializer = new ClusterInitializer(clusterService);
+ }
+
+ /**
+ * Tests the happy-case scenario of cluster initialization.
+ */
+ @Test
+ void testNormalInit() {
+ ClusterNode metastorageNode = new ClusterNode("metastore", "metastore", new NetworkAddress("foo", 123));
+ ClusterNode cmgNode = new ClusterNode("cmg", "cmg", new NetworkAddress("bar", 456));
+
+ when(topologyService.getByConsistentId(metastorageNode.name())).thenReturn(metastorageNode);
+ when(topologyService.getByConsistentId(cmgNode.name())).thenReturn(cmgNode);
+ when(topologyService.allMembers()).thenReturn(List.of(metastorageNode, cmgNode));
+
+ when(messagingService.invoke(any(ClusterNode.class), any(CmgInitMessage.class), anyLong()))
+ .thenReturn(initCompleteMessage());
+
+ // check that leaders are different in case different node IDs are provided
+ CompletableFuture<Void> initFuture = clusterInitializer.initCluster(List.of(metastorageNode.name()), List.of(cmgNode.name()));
+
+ verify(messagingService).invoke(eq(cmgNode), any(CmgInitMessage.class), anyLong());
+ verify(messagingService, never()).invoke(eq(metastorageNode), any(CmgInitMessage.class), anyLong());
+
+ assertThat(initFuture, willBe(nullValue(Void.class)));
+ }
+
+ /**
+ * Tests the happy-case scenario of cluster initialization when only Meta Storage are provided.
+ */
+ @Test
+ void testNormalInitSingleNodeList() {
+ ClusterNode metastorageNode = new ClusterNode("metastore", "metastore", new NetworkAddress("foo", 123));
+ ClusterNode cmgNode = new ClusterNode("cmg", "cmg", new NetworkAddress("bar", 456));
+
+ when(topologyService.getByConsistentId(metastorageNode.name())).thenReturn(metastorageNode);
+ when(topologyService.getByConsistentId(cmgNode.name())).thenReturn(cmgNode);
+ when(topologyService.allMembers()).thenReturn(List.of(metastorageNode, cmgNode));
+
+ when(messagingService.invoke(any(ClusterNode.class), any(CmgInitMessage.class), anyLong()))
+ .thenReturn(initCompleteMessage());
+
+ CompletableFuture<Void> initFuture = clusterInitializer.initCluster(List.of(metastorageNode.name()), List.of());
+
+ verify(messagingService).invoke(eq(metastorageNode), any(CmgInitMessage.class), anyLong());
+ verify(messagingService, never()).invoke(eq(cmgNode), any(CmgInitMessage.class), anyLong());
+
+ assertThat(initFuture, willBe(nullValue(Void.class)));
+ }
+
+ /**
+ * Tests a situation when one of the nodes fail during initialization.
+ */
+ @Test
+ void testInitCancel() {
+ ClusterNode metastorageNode = new ClusterNode("metastore", "metastore", new NetworkAddress("foo", 123));
+ ClusterNode cmgNode = new ClusterNode("cmg", "cmg", new NetworkAddress("bar", 456));
+
+ when(topologyService.getByConsistentId(metastorageNode.name())).thenReturn(metastorageNode);
+ when(topologyService.getByConsistentId(cmgNode.name())).thenReturn(cmgNode);
+ when(topologyService.allMembers()).thenReturn(List.of(metastorageNode, cmgNode));
+
+ when(messagingService.invoke(eq(cmgNode), any(CmgInitMessage.class), anyLong()))
+ .thenAnswer(invocation -> {
+ NetworkMessage response = msgFactory.initErrorMessage().cause("foobar").build();
+
+ return CompletableFuture.completedFuture(response);
+ });
+
+ when(messagingService.send(any(ClusterNode.class), any(CancelInitMessage.class)))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<Void> initFuture = clusterInitializer.initCluster(List.of(metastorageNode.name()), List.of(cmgNode.name()));
+
+ InitException e = assertFutureThrows(InitException.class, initFuture);
+
+ assertThat(e.getMessage(), containsString(String.format("Got error response from node \"%s\": foobar", cmgNode.name())));
+
+ verify(messagingService).send(eq(cmgNode), any(CancelInitMessage.class));
+ verify(messagingService, never()).send(eq(metastorageNode), any(CancelInitMessage.class));
+ }
+
+ private CompletableFuture<NetworkMessage> initCompleteMessage() {
+ NetworkMessage msg = msgFactory.initCompleteMessage().build();
+
+ return CompletableFuture.completedFuture(msg);
+ }
+
+ /**
+ * Tests that providing no nodes for the initialization throws an error.
+ */
+ @Test
+ void testEmptyInit() {
+ CompletableFuture<Void> initFuture = clusterInitializer.initCluster(List.of(), List.of());
+
+ assertFutureThrows(IllegalArgumentException.class, initFuture);
+ }
+
+ /**
+ * Tests that if some nodes are not present in the topology, an error is thrown.
+ */
+ @Test
+ void testUnresolvableNode() {
+ CompletableFuture<Void> initFuture = clusterInitializer.initCluster(List.of("foo"), List.of("bar"));
+
+ IllegalArgumentException e = assertFutureThrows(IllegalArgumentException.class, initFuture);
+
+ assertThat(e.getMessage(), containsString("Node \"foo\" is not present in the physical topology"));
+ }
+
+ private static <T extends Throwable> T assertFutureThrows(Class<T> expected, CompletableFuture<?> future) {
+ ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.SECONDS));
+
+ assertThat(e.getCause(), isA(expected));
+
+ return expected.cast(e.getCause());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
index ddb2fed..66991ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
@@ -37,7 +37,7 @@ public abstract class BaseIgniteAbstractTest {
/** Logger. */
protected static IgniteLogger log;
- /** Tets start milliseconds. */
+ /** Test start time in milliseconds. */
private long testStartMs;
/* Init test env. */
@@ -49,14 +49,13 @@ public abstract class BaseIgniteAbstractTest {
/**
* Should be invoked before a test will start.
*
- * @param testInfo Test information oject.
+ * @param testInfo Test information object.
* @param workDir Work directory.
- * @throws Exception If failed.
*/
- protected void setupBase(TestInfo testInfo, Path workDir) throws Exception {
+ protected void setupBase(TestInfo testInfo, Path workDir) {
log.info(">>> Starting test: {}#{}, displayName: {}, workDir: {}",
- testInfo.getTestClass().map(Class::getSimpleName).orElseGet(() -> "<null>"),
- testInfo.getTestMethod().map(Method::getName).orElseGet(() -> "<null>"),
+ testInfo.getTestClass().map(Class::getSimpleName).orElse("<null>"),
+ testInfo.getTestMethod().map(Method::getName).orElse("<null>"),
testInfo.getDisplayName(),
workDir.toAbsolutePath());
@@ -66,13 +65,12 @@ public abstract class BaseIgniteAbstractTest {
/**
* Should be invoked after the test has finished.
*
- * @param testInfo Test information oject.
- * @throws Exception If failed.
+ * @param testInfo Test information object.
*/
- protected void tearDownBase(TestInfo testInfo) throws Exception {
+ protected void tearDownBase(TestInfo testInfo) {
log.info(">>> Stopping test: {}#{}, displayName: {}, cost: {}ms.",
- testInfo.getTestClass().map(Class::getSimpleName).orElseGet(() -> "<null>"),
- testInfo.getTestMethod().map(Method::getName).orElseGet(() -> "<null>"),
+ testInfo.getTestClass().map(Class::getSimpleName).orElse("<null>"),
+ testInfo.getTestMethod().map(Method::getName).orElse("<null>"),
testInfo.getDisplayName(), monotonicMs() - testStartMs);
}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageService.java
index 5b9af5c..f211a75 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageService.java
@@ -349,4 +349,3 @@ public interface MetaStorageService {
@NotNull
CompletableFuture<Void> closeCursors(@NotNull String nodeId);
}
-
diff --git a/modules/metastorage/pom.xml b/modules/metastorage/pom.xml
index aa3b64e..bfa1a35 100644
--- a/modules/metastorage/pom.xml
+++ b/modules/metastorage/pom.xml
@@ -40,6 +40,11 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-cluster-management</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-network-api</artifactId>
</dependency>
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index aad0360..7e18783 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -19,22 +19,18 @@ package org.apache.ignite.internal.metastorage;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.network.util.ClusterServiceUtils.resolveNodes;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
-import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.client.CompactedException;
import org.apache.ignite.internal.metastorage.client.Condition;
@@ -58,9 +54,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
@@ -80,12 +74,7 @@ import org.jetbrains.annotations.Nullable;
* <li>Providing corresponding meta storage service proxy interface</li>
* </ul>
*/
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings("unused")
public class MetaStorageManager implements IgniteComponent {
- /** Logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageManager.class);
-
/** Meta storage raft group name. */
private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group";
@@ -95,23 +84,16 @@ public class MetaStorageManager implements IgniteComponent {
*/
public static final ByteArray APPLIED_REV = ByteArray.fromString("applied_revision");
+ private final ClusterService clusterService;
+
/** Vault manager in order to commit processed watches with corresponding applied revision. */
private final VaultManager vaultMgr;
- /** Configuration manager that handles local configuration. */
- private final ConfigurationManager locCfgMgr;
-
- /** Cluster network service that is used in order to handle cluster init message. */
- private final ClusterService clusterNetSvc;
-
/** Raft manager that is used for metastorage raft group handling. */
private final Loza raftMgr;
/** Meta storage service. */
- private volatile CompletableFuture<MetaStorageService> metaStorageSvcFut;
-
- /** Raft group service. */
- private volatile CompletableFuture<RaftGroupService> raftGroupServiceFut;
+ private final CompletableFuture<MetaStorageService> metaStorageSvcFut;
/**
* Aggregator of multiple watches to deploy them as one batch.
@@ -122,19 +104,11 @@ public class MetaStorageManager implements IgniteComponent {
/**
* Future which will be completed with {@link IgniteUuid}, when aggregated watch will be successfully deployed. Can be resolved to
- * {@link Optional#empty()} if no watch deployed at the moment.
- */
- private CompletableFuture<Optional<IgniteUuid>> deployFut = new CompletableFuture<>();
-
- /**
- * If true - all new watches will be deployed immediately.
+ * {@code null} if no watch deployed at the moment.
*
- * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}
+ * <p>Multi-threaded access is guarded by {@code this}.
*/
- private boolean deployed;
-
- /** Flag indicates if meta storage nodes were set on start. */
- private boolean metaStorageNodesOnStart;
+ private CompletableFuture<IgniteUuid> deployFut = new CompletableFuture<>();
/** Actual storage for the Metastorage. */
private final KeyValueStorage storage;
@@ -142,154 +116,134 @@ public class MetaStorageManager implements IgniteComponent {
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+ /**
+ * If true - all new watches will be deployed immediately.
+ *
+ * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}.
+ *
+ * <p>Multi-threaded access is guarded by {@code this}.
+ */
+ private boolean areWatchesDeployed = false;
+
+ /**
+ * Flag that indicates that the component has been initialized.
+ */
+ private volatile boolean isInitialized = false;
+
/** Prevents double stopping the component. */
- AtomicBoolean stopGuard = new AtomicBoolean();
+ private final AtomicBoolean isStopped = new AtomicBoolean();
/**
* The constructor.
*
- * @param vaultMgr Vault manager.
- * @param locCfgMgr Local configuration manager.
- * @param clusterNetSvc Cluster network service.
- * @param raftMgr Raft manager.
- * @param storage Storage. This component owns this resource and will manage its lifecycle.
+ * @param vaultMgr Vault manager.
+ * @param clusterService Cluster network service.
+ * @param raftMgr Raft manager.
+ * @param storage Storage. This component owns this resource and will manage its lifecycle.
*/
public MetaStorageManager(
VaultManager vaultMgr,
- ConfigurationManager locCfgMgr,
- ClusterService clusterNetSvc,
+ ClusterService clusterService,
+ ClusterManagementGroupManager cmgManager,
Loza raftMgr,
KeyValueStorage storage
) {
this.vaultMgr = vaultMgr;
- this.locCfgMgr = locCfgMgr;
- this.clusterNetSvc = clusterNetSvc;
+ this.clusterService = clusterService;
this.raftMgr = raftMgr;
this.storage = storage;
- }
- /** {@inheritDoc} */
- @Override
- public void start() {
- String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .metastorageNodes().value();
+ this.metaStorageSvcFut = cmgManager.metaStorageNodes()
+ // use default executor to avoid blocking CMG manager threads
+ .thenComposeAsync(metaStorageNodes -> {
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new NodeStoppingException());
+ }
- Predicate<ClusterNode> metaStorageNodesContainsLocPred =
- clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
+ try {
+ isInitialized = true;
- if (metastorageNodes.length > 0) {
- metaStorageNodesOnStart = true;
+ return initializeMetaStorage(metaStorageNodes);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ }
- List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream()
- .filter(metaStorageNodesContainsLocPred)
- .collect(Collectors.toList());
+ private CompletableFuture<MetaStorageService> initializeMetaStorage(Collection<String> metaStorageNodes) {
+ List<ClusterNode> metastorageNodes = resolveNodes(clusterService, metaStorageNodes);
- // TODO: This is temporary solution for providing human-readable error when you try to start single-node cluster
- // without hosting metastorage, this will be rewritten in init phase https://issues.apache.org/jira/browse/IGNITE-15114
- if (metaStorageMembers.isEmpty()) {
- throw new IgniteException(
- "Cannot start meta storage manager because there is no node in the cluster that hosts meta storage.");
- }
+ ClusterNode thisNode = clusterService.topologyService().localMember();
- // TODO: This is temporary solution. We need to prohibit starting several metastorage nodes
- // as far as we do not have mechanism of changing raft peers when new metastorage node is joining to cluster.
- // This will be rewritten in init phase https://issues.apache.org/jira/browse/IGNITE-15114
- if (metastorageNodes.length > 1) {
- throw new IgniteException(
- "Cannot start meta storage manager because it is not allowed to start several metastorage nodes.");
- }
+ if (metastorageNodes.contains(thisNode)) {
+ clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+ @Override
+ public void onAppeared(ClusterNode member) {
+ // No-op.
+ }
- storage.start();
+ @Override
+ public void onDisappeared(ClusterNode member) {
+ metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id()));
+ }
+ });
+ }
- try {
- raftGroupServiceFut = raftMgr.prepareRaftGroup(
- METASTORAGE_RAFT_GROUP_NAME,
- metaStorageMembers,
- () -> new MetaStorageListener(storage)
- );
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before Meta Storage manager", e);
- }
+ storage.start();
- this.metaStorageSvcFut = raftGroupServiceFut.thenApply(service ->
- new MetaStorageServiceImpl(service, clusterNetSvc.topologyService().localMember().id())
+ try {
+ CompletableFuture<RaftGroupService> raftServiceFuture = raftMgr.prepareRaftGroup(
+ METASTORAGE_RAFT_GROUP_NAME,
+ metastorageNodes,
+ () -> new MetaStorageListener(storage)
);
- if (hasMetastorageLocally(locCfgMgr)) {
- clusterNetSvc.topologyService().addEventHandler(new TopologyEventHandler() {
- @Override
- public void onAppeared(ClusterNode member) {
- // No-op.
- }
-
- @Override
- public void onDisappeared(ClusterNode member) {
- metaStorageSvcFut.thenCompose(svc -> svc.closeCursors(member.id()));
- }
- });
- }
- } else {
- this.metaStorageSvcFut = new CompletableFuture<>();
+ return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id()));
+ } catch (NodeStoppingException e) {
+ return CompletableFuture.failedFuture(e);
}
+ }
- // TODO: IGNITE-15114 Cluster initialization flow. Here we should complete metaStorageServiceFuture.
- // clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {});
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ // NO-OP
}
/** {@inheritDoc} */
@Override
- public void stop() {
- if (!stopGuard.compareAndSet(false, true)) {
+ public void stop() throws Exception {
+ if (!isStopped.compareAndSet(false, true)) {
return;
}
busyLock.block();
- Optional<IgniteUuid> watchId;
-
- try {
- // If deployed future is not done, that means that stop was called in the middle of
- // IgniteImpl.start, before deployWatches, or before init phase.
- // It is correct to check completeness of the future because the method calls are guarded by busy lock.
- // TODO: add busy lock for init method https://issues.apache.org/jira/browse/IGNITE-15114
- if (deployFut.isDone()) {
- watchId = deployFut.get();
-
- try {
- if (watchId.isPresent()) {
- metaStorageSvcFut.get().stopWatch(watchId.get());
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to get meta storage service.");
-
- throw new IgniteInternalException(e);
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to get watch.");
-
- throw new IgniteInternalException(e);
+ if (!isInitialized) {
+ // Stop command was called before the init command was received
+ return;
}
- try {
- if (raftGroupServiceFut != null) {
- raftGroupServiceFut.get().shutdown();
-
- raftMgr.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to get meta storage raft group service.");
-
- throw new IgniteInternalException(e);
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before Meta Storage manager", e);
+ synchronized (this) {
+ IgniteUtils.closeAll(
+ this::stopDeployedWatches,
+ () -> raftMgr.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME),
+ storage
+ );
}
+ }
- try {
- storage.close();
- } catch (Exception e) {
- throw new IgniteInternalException("Exception when stopping the storage", e);
+ private void stopDeployedWatches() throws Exception {
+ if (!areWatchesDeployed) {
+ return;
}
+
+ deployFut
+ .thenCompose(watchId -> watchId == null
+ ? CompletableFuture.completedFuture(null)
+ : metaStorageSvcFut.thenAccept(service -> service.stopWatch(watchId))
+ )
+ .get();
}
/**
@@ -301,25 +255,17 @@ public class MetaStorageManager implements IgniteComponent {
}
try {
- var watch = watchAggregator.watch(
- appliedRevision() + 1,
- this::storeEntries
- );
-
- if (watch.isEmpty()) {
- deployFut.complete(Optional.empty());
- } else {
- CompletableFuture<Void> fut =
- dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id -> deployFut.complete(Optional.of(id)));
+ CompletableFuture<IgniteUuid> deployFut = this.deployFut;
- if (metaStorageNodesOnStart) {
- fut.join();
+ updateAggregatedWatch().whenComplete((id, ex) -> {
+ if (ex == null) {
+ deployFut.complete(id);
} else {
- // TODO: need to wait for this future in init phase https://issues.apache.org/jira/browse/IGNITE-15114
+ deployFut.completeExceptionally(ex);
}
- }
+ });
- deployed = true;
+ areWatchesDeployed = true;
} finally {
busyLock.leaveBusy();
}
@@ -328,20 +274,19 @@ public class MetaStorageManager implements IgniteComponent {
/**
* Register watch listener by key.
*
- * @param key The target key.
+ * @param key The target key.
* @param lsnr Listener which will be notified for each update.
* @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel subscription
*/
- public synchronized CompletableFuture<Long> registerWatch(
- @Nullable ByteArray key,
- @NotNull WatchListener lsnr
- ) {
+ public synchronized CompletableFuture<Long> registerWatch(ByteArray key, WatchListener lsnr) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
try {
- return waitForReDeploy(watchAggregator.add(key, lsnr));
+ long watchId = watchAggregator.add(key, lsnr);
+
+ return updateWatches().thenApply(uuid -> watchId);
} finally {
busyLock.leaveBusy();
}
@@ -354,16 +299,15 @@ public class MetaStorageManager implements IgniteComponent {
* @param lsnr Listener which will be notified for each update.
* @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel subscription
*/
- public synchronized CompletableFuture<Long> registerWatch(
- @NotNull Collection<ByteArray> keys,
- @NotNull WatchListener lsnr
- ) {
+ public synchronized CompletableFuture<Long> registerWatch(Collection<ByteArray> keys, WatchListener lsnr) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
try {
- return waitForReDeploy(watchAggregator.add(keys, lsnr));
+ long watchId = watchAggregator.add(keys, lsnr);
+
+ return updateWatches().thenApply(uuid -> watchId);
} finally {
busyLock.leaveBusy();
}
@@ -373,7 +317,7 @@ public class MetaStorageManager implements IgniteComponent {
* Register watch listener by range of keys.
*
* @param from Start key of range.
- * @param to End key of range (exclusively).
+ * @param to End key of range (exclusively).
* @param lsnr Listener which will be notified for each update.
* @return future with id of registered watch.
*/
@@ -387,7 +331,9 @@ public class MetaStorageManager implements IgniteComponent {
}
try {
- return waitForReDeploy(watchAggregator.add(from, to, lsnr));
+ long watchId = watchAggregator.add(from, to, lsnr);
+
+ return updateWatches().thenApply(uuid -> watchId);
} finally {
busyLock.leaveBusy();
}
@@ -396,20 +342,19 @@ public class MetaStorageManager implements IgniteComponent {
/**
* Register watch listener by key prefix.
*
- * @param key Prefix to listen.
+ * @param key Prefix to listen.
* @param lsnr Listener which will be notified for each update.
* @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel subscription
*/
- public synchronized CompletableFuture<Long> registerWatchByPrefix(
- @Nullable ByteArray key,
- @NotNull WatchListener lsnr
- ) {
+ public synchronized CompletableFuture<Long> registerWatchByPrefix(ByteArray key, WatchListener lsnr) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
try {
- return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
+ long watchId = watchAggregator.addPrefix(key, lsnr);
+
+ return updateWatches().thenApply(uuid -> watchId);
} finally {
busyLock.leaveBusy();
}
@@ -428,13 +373,8 @@ public class MetaStorageManager implements IgniteComponent {
try {
watchAggregator.cancel(id);
- if (deployed) {
- return updateWatches().thenAccept(v -> {
- });
- } else {
- return deployFut.thenAccept(uuid -> {
- });
- }
+
+ return updateWatches().thenApply(uuid -> null);
} finally {
busyLock.leaveBusy();
}
@@ -689,7 +629,7 @@ public class MetaStorageManager implements IgniteComponent {
/**
* Invoke, which supports nested conditional statements.
*
- * @see MetaStorageService#invoke(org.apache.ignite.internal.metastorage.client.If)
+ * @see MetaStorageService#invoke(If)
*/
public @NotNull CompletableFuture<StatementResult> invoke(@NotNull If iif) {
if (!busyLock.enterBusy()) {
@@ -704,8 +644,8 @@ public class MetaStorageManager implements IgniteComponent {
}
/**
- * Retrieves entries for the given key range in lexicographic order.
- * Entries will be filtered out by upper bound of given revision number.
+ * Retrieves entries for the given key range in lexicographic order. Entries will be filtered out by upper bound of given revision
+ * number.
*
* @see MetaStorageService#range(ByteArray, ByteArray, long)
*/
@@ -716,9 +656,7 @@ public class MetaStorageManager implements IgniteComponent {
}
try {
- return new CursorWrapper<>(
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound))
- );
+ return new CursorWrapper<>(metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound)));
} finally {
busyLock.leaveBusy();
}
@@ -735,9 +673,7 @@ public class MetaStorageManager implements IgniteComponent {
}
try {
- return new CursorWrapper<>(
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo))
- );
+ return new CursorWrapper<>(metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo)));
} finally {
busyLock.leaveBusy();
}
@@ -748,10 +684,10 @@ public class MetaStorageManager implements IgniteComponent {
* upper bound. Applied revision is a revision of the last successful vault update.
*
* @param keyFrom Start key of range (inclusive). Couldn't be {@code null}.
- * @param keyTo End key of range (exclusive). Could be {@code null}.
+ * @param keyTo End key of range (exclusive). Could be {@code null}.
* @return Cursor built upon entries corresponding to the given range and applied revision.
* @throws OperationTimeoutException If the operation is timed out.
- * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
* @see ByteArray
* @see Entry
*/
@@ -762,9 +698,7 @@ public class MetaStorageManager implements IgniteComponent {
}
try {
- return new CursorWrapper<>(
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, appliedRevision()))
- );
+ return new CursorWrapper<>(metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, appliedRevision())));
} finally {
busyLock.leaveBusy();
}
@@ -779,7 +713,7 @@ public class MetaStorageManager implements IgniteComponent {
* @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be {@code null}.
* @return Cursor built upon entries corresponding to the given range and applied revision.
* @throws OperationTimeoutException If the operation is timed out.
- * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
* @see ByteArray
* @see Entry
*/
@@ -806,7 +740,7 @@ public class MetaStorageManager implements IgniteComponent {
* @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be {@code null}.
* @return Cursor built upon entries corresponding to the given range and revision.
* @throws OperationTimeoutException If the operation is timed out.
- * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
* @see ByteArray
* @see Entry
*/
@@ -820,11 +754,11 @@ public class MetaStorageManager implements IgniteComponent {
*
* <p>Prefix query is a synonym of the range query {@code range(prefixKey, nextKey(prefixKey))}.
*
- * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be {@code null}.
+ * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be {@code null}.
* @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision.
* @return Cursor built upon entries corresponding to the given range and revision.
* @throws OperationTimeoutException If the operation is timed out.
- * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
* @see ByteArray
* @see Entry
*/
@@ -872,30 +806,35 @@ public class MetaStorageManager implements IgniteComponent {
/**
* Stop current batch of consolidated watches and register new one from current {@link WatchAggregator}.
*
+ * <p>This method MUST always be called under a {@code synchronized} block.
+ *
* @return Ignite UUID of new consolidated watch.
*/
- private CompletableFuture<Optional<IgniteUuid>> updateWatches() {
- long revision = appliedRevision() + 1;
+ private CompletableFuture<IgniteUuid> updateWatches() {
+ if (!areWatchesDeployed) {
+ return deployFut;
+ }
deployFut = deployFut
- .thenCompose(idOpt ->
- idOpt
- .map(id -> metaStorageSvcFut.thenCompose(svc -> svc.stopWatch(id)))
- .orElseGet(() -> CompletableFuture.completedFuture(null))
+ .thenCompose(id -> id == null
+ ? CompletableFuture.completedFuture(null)
+ : metaStorageSvcFut.thenCompose(svc -> svc.stopWatch(id))
)
- .thenCompose(r ->
- watchAggregator.watch(revision, this::storeEntries)
- .map(watch -> dispatchAppropriateMetaStorageWatch(watch).thenApply(Optional::of))
- .orElseGet(() -> CompletableFuture.completedFuture(Optional.empty()))
- );
+ .thenCompose(r -> updateAggregatedWatch());
return deployFut;
}
+ private CompletableFuture<IgniteUuid> updateAggregatedWatch() {
+ return watchAggregator.watch(appliedRevision() + 1, this::storeEntries)
+ .map(this::dispatchAppropriateMetaStorageWatch)
+ .orElseGet(() -> CompletableFuture.completedFuture(null));
+ }
+
/**
* Store entries with appropriate associated revision.
*
- * @param entries to store.
+ * @param entries to store.
* @param revision associated revision.
*/
private void storeEntries(Collection<IgniteBiTuple<ByteArray, byte[]>> entries, long revision) {
@@ -919,56 +858,6 @@ public class MetaStorageManager implements IgniteComponent {
vaultMgr.putAll(batch).join();
}
- /**
- * Returns future, which will be completed after redeploy finished.
- *
- * @param id Id of watch to redeploy.
- */
- private CompletableFuture<Long> waitForReDeploy(long id) {
- if (deployed) {
- return updateWatches().thenApply(uid -> id);
- } else {
- return deployFut.thenApply(uid -> id);
- }
- }
-
- /**
- * Checks whether the given node hosts meta storage.
- *
- * @param nodeName Node unique name.
- * @param metastorageMembers Meta storage members names.
- * @return {@code true} if the node has meta storage, {@code false} otherwise.
- */
- public static boolean hasMetastorage(String nodeName, String[] metastorageMembers) {
- boolean isNodeHasMetasorage = false;
-
- for (String name : metastorageMembers) {
- if (name.equals(nodeName)) {
- isNodeHasMetasorage = true;
-
- break;
- }
- }
-
- return isNodeHasMetasorage;
- }
-
- /**
- * Checks whether the local node hosts meta storage.
- *
- * @param configurationMgr Configuration manager.
- * @return {@code true} if the node has meta storage, {@code false} otherwise.
- */
- public boolean hasMetastorageLocally(ConfigurationManager configurationMgr) {
- String[] metastorageMembers = configurationMgr
- .configurationRegistry()
- .getConfiguration(NodeConfiguration.KEY)
- .metastorageNodes()
- .value();
-
- return hasMetastorage(vaultMgr.name().join(), metastorageMembers);
- }
-
// TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
/** Cursor wrapper. */
@@ -1104,23 +993,4 @@ public class MetaStorageManager implements IgniteComponent {
throw new UnsupportedOperationException("Unsupported type of criterion");
}
}
-
- /**
- * Return metastorage nodes.
- *
- * <p>This code will be deleted after node init phase is developed. https://issues.apache.org/jira/browse/IGNITE-15114
- */
- private List<ClusterNode> metastorageNodes() {
- String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .metastorageNodes().value();
-
- Predicate<ClusterNode> metaStorageNodesContainsLocPred =
- clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
-
- List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream()
- .filter(metaStorageNodesContainsLocPred)
- .collect(Collectors.toList());
-
- return metaStorageMembers;
- }
}
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
index 8ea3d12..bb850e0 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
@@ -121,28 +121,30 @@ public class MessageDeserializerGenerator {
.endControlFlow()
.addCode("\n");
- method.beginControlFlow("switch (reader.state())");
-
- for (int i = 0; i < getters.size(); ++i) {
- method
- .beginControlFlow("case $L:", i)
- .addStatement(readMessageCodeBlock(getters.get(i), msgField))
- .addCode("\n")
- .addCode(CodeBlock.builder()
- .beginControlFlow("if (!reader.isLastRead())")
- .addStatement("return false")
- .endControlFlow()
- .build()
- )
- .addCode("\n")
- .addStatement("reader.incrementState()")
- .endControlFlow()
- .addComment("Falls through");
+ if (!getters.isEmpty()) {
+ method.beginControlFlow("switch (reader.state())");
+
+ for (int i = 0; i < getters.size(); ++i) {
+ method
+ .beginControlFlow("case $L:", i)
+ .addStatement(readMessageCodeBlock(getters.get(i), msgField))
+ .addCode("\n")
+ .addCode(CodeBlock.builder()
+ .beginControlFlow("if (!reader.isLastRead())")
+ .addStatement("return false")
+ .endControlFlow()
+ .build()
+ )
+ .addCode("\n")
+ .addStatement("reader.incrementState()")
+ .endControlFlow()
+ .addComment("Falls through");
+ }
+
+ method.endControlFlow().addCode("\n");
}
- method.endControlFlow();
-
- method.addCode("\n").addStatement("return reader.afterMessageRead($T.class)", message.className());
+ method.addStatement("return reader.afterMessageRead($T.class)", message.className());
return method.build();
}
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
index 1926e8e..781353e 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
@@ -98,21 +98,23 @@ public class MessageSerializerGenerator {
.endControlFlow()
.addCode("\n");
- method.beginControlFlow("switch (writer.state())");
-
- for (int i = 0; i < getters.size(); ++i) {
- method
- .beginControlFlow("case $L:", i)
- .addCode(writeMessageCodeBlock(getters.get(i)))
- .addCode("\n")
- .addStatement("writer.incrementState()")
- .endControlFlow()
- .addComment("Falls through");
+ if (!getters.isEmpty()) {
+ method.beginControlFlow("switch (writer.state())");
+
+ for (int i = 0; i < getters.size(); ++i) {
+ method
+ .beginControlFlow("case $L:", i)
+ .addCode(writeMessageCodeBlock(getters.get(i)))
+ .addCode("\n")
+ .addStatement("writer.incrementState()")
+ .endControlFlow()
+ .addComment("Falls through");
+ }
+
+ method.endControlFlow().addCode("\n");
}
- method.endControlFlow();
-
- method.addCode("\n").addStatement("return true");
+ method.addStatement("return true");
return method.build();
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/util/ClusterServiceUtils.java b/modules/network-api/src/main/java/org/apache/ignite/network/util/ClusterServiceUtils.java
new file mode 100644
index 0000000..da1500a
--- /dev/null
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/util/ClusterServiceUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.util;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Various utilities related to {@link ClusterService}.
+ */
+public class ClusterServiceUtils {
+ private ClusterServiceUtils() {
+ }
+
+ /**
+ * Resolves given node names (a.k.a. consistent IDs) into {@link ClusterNode}s.
+ *
+ * @param consistentIds consistent IDs.
+ * @return list of resolved {@code ClusterNode}s.
+ * @throws IllegalArgumentException if any of the given nodes are not present in the physical topology.
+ */
+ public static List<ClusterNode> resolveNodes(ClusterService clusterService, Collection<String> consistentIds) {
+ return consistentIds.stream()
+ .map(consistentId -> {
+ ClusterNode node = clusterService.topologyService().getByConsistentId(consistentId);
+
+ if (node == null) {
+ throw new IllegalArgumentException(String.format(
+ "Node \"%s\" is not present in the physical topology", consistentId
+ ));
+ }
+
+ return node;
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/Routes.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/Routes.java
index 16a1b9b..2dc53f3 100644
--- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/Routes.java
+++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/Routes.java
@@ -80,4 +80,17 @@ public interface Routes {
addRoute(new Route(route, HttpMethod.PATCH, acceptType, hnd));
return this;
}
+
+ /**
+ * Adds a POST handler.
+ *
+ * @param route Route.
+ * @param acceptType Accept type.
+ * @param hnd Actual handler of the request.
+ * @return Router
+ */
+ default Routes post(String route, String acceptType, RequestHandler hnd) {
+ addRoute(new Route(route, HttpMethod.POST, acceptType, hnd));
+ return this;
+ }
}
diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java
index 247061c..f660e03 100644
--- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java
+++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.BindException;
+import java.net.InetSocketAddress;
import java.util.function.Consumer;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
import org.apache.ignite.configuration.schemas.rest.RestView;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.rest.netty.RestApiInitializer;
import org.apache.ignite.internal.rest.routes.Router;
import org.apache.ignite.internal.rest.routes.SimpleRouter;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -146,4 +148,20 @@ public class RestComponent implements RestHandlersRegister, IgniteComponent {
channel = null;
}
}
+
+ /**
+ * Returns the local address that the REST endpoint is bound to.
+ *
+ * @return local REST address.
+ * @throws IgniteInternalException if the component has not been started yet.
+ */
+ public InetSocketAddress localAddress() {
+ Channel channel = this.channel;
+
+ if (channel == null) {
+ throw new IgniteInternalException("RestComponent has not been started");
+ }
+
+ return (InetSocketAddress) channel.localAddress();
+ }
}
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index a5aa970..48899a8 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -41,7 +41,6 @@
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-rest</artifactId>
- <scope>compile</scope>
</dependency>
<dependency>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index de684e6..efb0f7c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.configuration;
-import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.directProxy;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -28,6 +26,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
import java.nio.file.Path;
import java.util.List;
@@ -39,14 +38,14 @@ import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorageListener;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
-import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorage;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestComponent;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -80,15 +79,13 @@ public class ItDistributedConfigurationPropertiesTest {
* An emulation of an Ignite node, that only contains components necessary for tests.
*/
private static class Node {
- private final List<String> metaStorageNodes;
-
private final VaultManager vaultManager;
private final ClusterService clusterService;
private final Loza raftManager;
- private final ConfigurationManager cfgManager;
+ private final ClusterManagementGroupManager cmgManager;
private final MetaStorageManager metaStorageManager;
@@ -104,11 +101,8 @@ public class ItDistributedConfigurationPropertiesTest {
TestInfo testInfo,
Path workDir,
NetworkAddress addr,
- List<NetworkAddress> memberAddrs,
- List<String> metaStorageNodes
+ List<NetworkAddress> memberAddrs
) {
- this.metaStorageNodes = metaStorageNodes;
-
vaultManager = new VaultManager(new InMemoryVaultService());
clusterService = ClusterServiceTestUtils.clusterService(
@@ -120,18 +114,12 @@ public class ItDistributedConfigurationPropertiesTest {
raftManager = new Loza(clusterService, workDir);
- cfgManager = new ConfigurationManager(
- List.of(NodeConfiguration.KEY),
- Map.of(),
- new LocalConfigurationStorage(vaultManager),
- List.of(),
- List.of()
- );
+ cmgManager = new ClusterManagementGroupManager(clusterService, raftManager, mock(RestComponent.class));
metaStorageManager = new MetaStorageManager(
vaultManager,
- cfgManager,
clusterService,
+ cmgManager,
raftManager,
new SimpleInMemoryKeyValueStorage()
);
@@ -166,26 +154,13 @@ public class ItDistributedConfigurationPropertiesTest {
void start() throws Exception {
vaultManager.start();
- cfgManager.start();
-
- // metastorage configuration
- String metaStorageCfg = metaStorageNodes.stream()
- .map(Object::toString)
- .collect(joining("\", \"", "\"", "\""));
-
- var config = String.format("{ node: { metastorageNodes : [ %s ] } }", metaStorageCfg);
-
- cfgManager.bootstrap(config);
-
- Stream.of(clusterService, raftManager, metaStorageManager)
+ Stream.of(clusterService, raftManager, cmgManager, metaStorageManager)
.forEach(IgniteComponent::start);
// deploy watches to propagate data from the metastore into the vault
metaStorageManager.deployWatches();
distributedCfgManager.start();
-
- distributedCfgManager.configurationRegistry().initializeDefaults();
}
/**
@@ -193,7 +168,7 @@ public class ItDistributedConfigurationPropertiesTest {
*/
void stop() throws Exception {
var components = List.of(
- distributedCfgManager, metaStorageManager, raftManager, clusterService, cfgManager, vaultManager
+ distributedCfgManager, cmgManager, metaStorageManager, raftManager, clusterService, vaultManager
);
for (IgniteComponent igniteComponent : components) {
@@ -211,6 +186,10 @@ public class ItDistributedConfigurationPropertiesTest {
void stopReceivingUpdates() {
receivesUpdates = false;
}
+
+ String name() {
+ return clusterService.topologyService().localMember().name();
+ }
}
private Node firstNode;
@@ -224,28 +203,28 @@ public class ItDistributedConfigurationPropertiesTest {
void setUp(@WorkDirectory Path workDir, TestInfo testInfo) throws Exception {
var firstNodeAddr = new NetworkAddress("localhost", 10000);
- String firstNodeName = testNodeName(testInfo, firstNodeAddr.port());
-
var secondNodeAddr = new NetworkAddress("localhost", 10001);
+ List<NetworkAddress> allNodes = List.of(firstNodeAddr, secondNodeAddr);
+
firstNode = new Node(
testInfo,
workDir.resolve("firstNode"),
firstNodeAddr,
- List.of(firstNodeAddr, secondNodeAddr),
- List.of(firstNodeName)
+ allNodes
);
secondNode = new Node(
testInfo,
workDir.resolve("secondNode"),
secondNodeAddr,
- List.of(firstNodeAddr, secondNodeAddr),
- List.of(firstNodeName)
+ allNodes
);
firstNode.start();
secondNode.start();
+
+ firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of());
}
/**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 58b4ea4..c6da615 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.configuration.storage;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
import java.io.Serializable;
import java.nio.file.Path;
@@ -31,19 +31,14 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
-import org.apache.ignite.configuration.RootKey;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
-import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.table.distributed.TableTxManagerImpl;
+import org.apache.ignite.internal.rest.RestComponent;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.tx.LockManager;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.ClusterService;
@@ -64,20 +59,14 @@ public class ItDistributedConfigurationStorageTest {
* An emulation of an Ignite node, that only contains components necessary for tests.
*/
private static class Node {
- private final String name;
-
private final VaultManager vaultManager;
private final ClusterService clusterService;
- private final LockManager lockManager;
-
- private final TxManager txManager;
+ private final ClusterManagementGroupManager cmgManager;
private final Loza raftManager;
- private final ConfigurationManager cfgManager;
-
private final MetaStorageManager metaStorageManager;
private final DistributedConfigurationStorage cfgStorage;
@@ -88,8 +77,6 @@ public class ItDistributedConfigurationStorageTest {
Node(TestInfo testInfo, Path workDir) {
var addr = new NetworkAddress("localhost", 10000);
- name = testNodeName(testInfo, addr.port());
-
vaultManager = new VaultManager(new PersistentVaultService(workDir.resolve("vault")));
clusterService = ClusterServiceTestUtils.clusterService(
@@ -99,26 +86,14 @@ public class ItDistributedConfigurationStorageTest {
new TestScaleCubeClusterServiceFactory()
);
- lockManager = new HeapLockManager();
-
raftManager = new Loza(clusterService, workDir);
- txManager = new TableTxManagerImpl(clusterService, lockManager);
-
- List<RootKey<?, ?>> rootKeys = List.of(NodeConfiguration.KEY);
-
- cfgManager = new ConfigurationManager(
- rootKeys,
- Map.of(),
- new LocalConfigurationStorage(vaultManager),
- List.of(),
- List.of()
- );
+ cmgManager = new ClusterManagementGroupManager(clusterService, raftManager, mock(RestComponent.class));
metaStorageManager = new MetaStorageManager(
vaultManager,
- cfgManager,
clusterService,
+ cmgManager,
raftManager,
new SimpleInMemoryKeyValueStorage()
);
@@ -132,14 +107,7 @@ public class ItDistributedConfigurationStorageTest {
void start() throws Exception {
vaultManager.start();
- cfgManager.start();
-
- // metastorage configuration
- var config = String.format("{\"node\": {\"metastorageNodes\": [ \"%s\" ]}}", name);
-
- cfgManager.bootstrap(config);
-
- Stream.of(clusterService, raftManager, txManager, metaStorageManager).forEach(IgniteComponent::start);
+ Stream.of(clusterService, raftManager, cmgManager, metaStorageManager).forEach(IgniteComponent::start);
// this is needed to avoid assertion errors
cfgStorage.registerConfigurationListener(changedEntries -> completedFuture(null));
@@ -153,7 +121,7 @@ public class ItDistributedConfigurationStorageTest {
*/
void stop() throws Exception {
var components =
- List.of(metaStorageManager, raftManager, txManager, clusterService, cfgManager, vaultManager);
+ List.of(metaStorageManager, cmgManager, raftManager, clusterService, vaultManager);
for (IgniteComponent igniteComponent : components) {
igniteComponent.beforeNodeStop();
@@ -163,6 +131,10 @@ public class ItDistributedConfigurationStorageTest {
component.stop();
}
}
+
+ String name() {
+ return clusterService.topologyService().localMember().name();
+ }
}
/**
@@ -180,6 +152,8 @@ public class ItDistributedConfigurationStorageTest {
try {
node.start();
+ node.cmgManager.initCluster(List.of(node.name()), List.of());
+
assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true)));
waitForCondition(() -> Objects.nonNull(node.vaultManager.get(APPLIED_REV).join().value()), 3000);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
index e061ad2..3bacf58 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.configuration.schemas.table.ColumnChange;
import org.apache.ignite.internal.ItUtils;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteObjectName;
@@ -42,7 +43,6 @@ import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.definition.ColumnDefinition;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.schema.definition.TableDefinition;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -83,7 +83,6 @@ abstract class AbstractSchemaChangeTest {
nodesBootstrapCfg.put(
node0Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[0] + ",\n"
+ " nodeFinder: {\n"
@@ -96,7 +95,6 @@ abstract class AbstractSchemaChangeTest {
nodesBootstrapCfg.put(
node1Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[1] + ",\n"
+ " nodeFinder: {\n"
@@ -109,7 +107,6 @@ abstract class AbstractSchemaChangeTest {
nodesBootstrapCfg.put(
node2Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[2] + ",\n"
+ " nodeFinder: {\n"
@@ -132,7 +129,7 @@ abstract class AbstractSchemaChangeTest {
* Check unsupported column type change.
*/
@Test
- public void testChangeColumnType() {
+ public void testChangeColumnType() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -163,7 +160,7 @@ abstract class AbstractSchemaChangeTest {
* Check unsupported nullability change.
*/
@Test
- public void testChangeColumnsNullability() {
+ public void testChangeColumnsNullability() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -175,12 +172,15 @@ abstract class AbstractSchemaChangeTest {
/**
* Returns grid nodes.
*/
- @NotNull
- protected List<Ignite> startGrid() {
+ protected List<Ignite> startGrid() throws Exception {
nodesBootstrapCfg.forEach((nodeName, configStr) ->
clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
);
+ IgniteImpl metastorageNode = (IgniteImpl) clusterNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
+
return clusterNodes;
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItBaselineChangesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItBaselineChangesTest.java
index f2f5b96..6436f92 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItBaselineChangesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItBaselineChangesTest.java
@@ -29,10 +29,12 @@ import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.internal.ItUtils;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.schema.definition.TableDefinition;
@@ -41,6 +43,7 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -48,6 +51,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Test for baseline changes.
*/
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-16471")
@ExtendWith(WorkDirectoryExtension.class)
public class ItBaselineChangesTest {
/** Start network port for test nodes. */
@@ -66,24 +70,11 @@ public class ItBaselineChangesTest {
*/
@BeforeEach
void setUp(TestInfo testInfo) {
- String node0Name = testNodeName(testInfo, BASE_PORT);
- String node1Name = testNodeName(testInfo, BASE_PORT + 1);
- String node2Name = testNodeName(testInfo, BASE_PORT + 2);
+ for (int i = 0; i < 3; ++i) {
+ String nodeName = testNodeName(testInfo, BASE_PORT + i);
- initClusterNodes.put(
- node0Name,
- buildConfig(node0Name, 0)
- );
-
- initClusterNodes.put(
- node1Name,
- buildConfig(node0Name, 1)
- );
-
- initClusterNodes.put(
- node2Name,
- buildConfig(node0Name, 2)
- );
+ initClusterNodes.put(nodeName, buildConfig(i));
+ }
}
/**
@@ -98,13 +89,17 @@ public class ItBaselineChangesTest {
* Check dynamic table creation.
*/
@Test
- void testBaselineExtending(TestInfo testInfo) {
+ void testBaselineExtending(TestInfo testInfo) throws NodeStoppingException {
initClusterNodes.forEach((nodeName, configStr) ->
clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
);
assertEquals(3, clusterNodes.size());
+ IgniteImpl metastorageNode = (IgniteImpl) clusterNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
+
// Create table on node 0.
TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
SchemaBuilders.column("key", ColumnType.INT64).build(),
@@ -129,13 +124,11 @@ public class ItBaselineChangesTest {
var node4Name = testNodeName(testInfo, nodePort(4));
// Start 2 new nodes after
- var node3 = IgnitionManager.start(
- node3Name, buildConfig(metaStoreNode.name(), 3), workDir.resolve(node3Name));
+ var node3 = IgnitionManager.start(node3Name, buildConfig(3), workDir.resolve(node3Name));
clusterNodes.add(node3);
- var node4 = IgnitionManager.start(
- node4Name, buildConfig(metaStoreNode.name(), 4), workDir.resolve(node4Name));
+ var node4 = IgnitionManager.start(node4Name, buildConfig(4), workDir.resolve(node4Name));
clusterNodes.add(node4);
@@ -147,14 +140,13 @@ public class ItBaselineChangesTest {
Table tbl4 = node4.tables().table(schTbl1.canonicalName());
- final Tuple keyTuple1 = Tuple.create().set("key", 1L);
+ Tuple keyTuple1 = Tuple.create().set("key", 1L);
assertEquals(1, (Long) tbl4.recordView().get(null, keyTuple1).value("key"));
}
- private String buildConfig(String metastoreNodeName, int nodeIdx) {
+ private static String buildConfig(int nodeIdx) {
return "{\n"
- + " node.metastorageNodes: [ \"" + metastoreNodeName + "\" ],\n"
+ " network: {\n"
+ " port: " + nodePort(nodeIdx) + ",\n"
+ " nodeFinder: {\n"
@@ -164,7 +156,7 @@ public class ItBaselineChangesTest {
+ "}";
}
- private int nodePort(int nodeIdx) {
+ private static int nodePort(int nodeIdx) {
return BASE_PORT + nodeIdx;
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItClusterInitTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItClusterInitTest.java
new file mode 100644
index 0000000..ee2b942
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItClusterInitTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for initializing a cluster.
+ */
+public class ItClusterInitTest extends IgniteAbstractTest {
+ private final List<IgniteImpl> nodes = new ArrayList<>();
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(nodes);
+ }
+
+ /**
+ * Tests a scenario when a cluster is initialized twice.
+ */
+ @Test
+ void testDoubleInit(TestInfo testInfo) throws NodeStoppingException {
+ createCluster(testInfo, 1);
+
+ IgniteImpl node = nodes.get(0);
+
+ node.init(List.of(node.name()));
+
+ assertThrows(IgniteInternalException.class, () -> node.init(List.of(node.name())));
+ }
+
+ /**
+ * Tests a scenario when some nodes are stopped during initialization.
+ */
+ @Test
+ void testInitStoppingNodes(TestInfo testInfo) {
+ createCluster(testInfo, 2);
+
+ IgniteImpl node1 = nodes.get(0);
+ IgniteImpl node2 = nodes.get(1);
+
+ node2.stop();
+
+ assertThrows(IgniteInternalException.class, () -> node1.init(List.of(node1.name(), node2.name())));
+
+ node1.stop();
+
+ assertThrows(NodeStoppingException.class, () -> node1.init(List.of(node1.name(), node2.name())));
+ }
+
+ private void createCluster(TestInfo testInfo, int numNodes) {
+ int[] ports = IntStream.range(3344, 3344 + numNodes).toArray();
+
+ String nodeFinderConfig = Arrays.stream(ports)
+ .mapToObj(port -> String.format("\"localhost:%d\"", port))
+ .collect(Collectors.joining(", ", "[", "]"));
+
+ Arrays.stream(ports)
+ .mapToObj(port -> {
+ String config = "{"
+ + " network.port: " + port + ","
+ + " network.nodeFinder.netClusterNodes: " + nodeFinderConfig
+ + "}";
+
+ String nodeName = testNodeName(testInfo, port);
+
+ return (IgniteImpl) IgnitionManager.start(nodeName, config, workDir.resolve(nodeName));
+ })
+ .forEach(nodes::add);
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 10209e7..21d0b91 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -22,15 +22,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import com.google.common.collect.Lists;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.ItUtils;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
@@ -72,45 +71,34 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
/**
* Nodes bootstrap configuration.
*/
- private static final Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {
- {
- put("node0", "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ \"node0\" ]\n"
- + " },\n"
+ private static final Map<String, String> nodesBootstrapCfg = Map.of(
+ "node0", "{\n"
+ " \"network\": {\n"
+ " \"port\":3344,\n"
+ " \"nodeFinder\": {\n"
+ " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ " }\n"
+ " }\n"
- + "}");
+ + "}",
- put("node1", "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ \"node0\" ]\n"
- + " },\n"
+ "node1", "{\n"
+ " \"network\": {\n"
+ " \"port\":3345,\n"
+ " \"nodeFinder\": {\n"
+ " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ " }\n"
+ " }\n"
- + "}");
+ + "}",
- put("node2", "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ \"node0\" ]\n"
- + " },\n"
+ "node2", "{\n"
+ " \"network\": {\n"
+ " \"port\":3346,\n"
+ " \"nodeFinder\": {\n"
+ " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ " }\n"
+ " }\n"
- + "}");
- }
- };
+ + "}"
+ );
/**
* Cluster nodes.
@@ -125,6 +113,10 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
nodesBootstrapCfg.forEach((nodeName, configStr) ->
clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
);
+
+ IgniteImpl metastorageNode = (IgniteImpl) clusterNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
}
/**
@@ -132,7 +124,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
*/
@AfterEach
void afterEach() throws Exception {
- IgniteUtils.closeAll(Lists.reverse(clusterNodes));
+ IgniteUtils.closeAll(ItUtils.reverse(clusterNodes));
}
/**
@@ -142,7 +134,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
@Test
public void test() throws Exception {
Ignite ignite0 = clusterNodes.get(0);
- final IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
+ IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
createTable(ignite0, SCHEMA, SHORT_TABLE_NAME);
@@ -197,7 +189,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
);
}
- final CompletableFuture insertFut = IgniteTestUtils.runAsync(() ->
+ CompletableFuture insertFut = IgniteTestUtils.runAsync(() ->
table1.recordView().insert(
null,
Tuple.create()
@@ -207,11 +199,11 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
.set("valStr2", "str2_" + 0)
));
- final CompletableFuture getFut = IgniteTestUtils.runAsync(() -> {
+ CompletableFuture getFut = IgniteTestUtils.runAsync(() -> {
table1.recordView().get(null, Tuple.create().set("key", 10L));
});
- final CompletableFuture checkDefaultFut = IgniteTestUtils.runAsync(() -> {
+ CompletableFuture checkDefaultFut = IgniteTestUtils.runAsync(() -> {
assertEquals("default",
table1.recordView().get(null, Tuple.create().set("key", 0L))
.value("valStr2"));
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDynamicTableCreationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDynamicTableCreationTest.java
index e8d814e..1ed34d0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDynamicTableCreationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDynamicTableCreationTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.IgnitionManager;
import org.apache.ignite.configuration.schemas.table.ColumnChange;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.internal.ItUtils;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -47,9 +48,7 @@ import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -77,48 +76,21 @@ class ItDynamicTableCreationTest {
*/
@BeforeEach
void setUp(TestInfo testInfo) {
- String node0Name = testNodeName(testInfo, PORTS[0]);
- String node1Name = testNodeName(testInfo, PORTS[1]);
- String node2Name = testNodeName(testInfo, PORTS[2]);
-
- nodesBootstrapCfg.put(
- node0Name,
- "{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
- + " network: {\n"
- + " port: " + PORTS[0] + ",\n"
- + " nodeFinder:{\n"
- + " netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ] \n"
- + " }\n"
- + " }\n"
- + "}"
- );
-
- nodesBootstrapCfg.put(
- node1Name,
- "{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
- + " network: {\n"
- + " port: " + PORTS[1] + ",\n"
- + " nodeFinder:{\n"
- + " netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
- + " }\n"
- + " }\n"
- + "}"
- );
-
- nodesBootstrapCfg.put(
- node2Name,
- "{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
- + " network: {\n"
- + " port: " + PORTS[2] + ",\n"
- + " nodeFinder:{\n"
- + " netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
- + " }\n"
- + " }\n"
- + "}"
- );
+ for (int i = 0; i < 3; ++i) {
+ String nodeName = testNodeName(testInfo, PORTS[i]);
+
+ nodesBootstrapCfg.put(
+ nodeName,
+ "{\n"
+ + " network: {\n"
+ + " port: " + PORTS[i] + ",\n"
+ + " nodeFinder:{\n"
+ + " netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ] \n"
+ + " }\n"
+ + " }\n"
+ + "}"
+ );
+ }
}
/**
@@ -132,14 +104,17 @@ class ItDynamicTableCreationTest {
/**
* Returns grid nodes.
*/
- @NotNull
- protected List<Ignite> startGrid() {
+ protected List<Ignite> startGrid() throws Exception {
nodesBootstrapCfg.forEach((nodeName, configStr) ->
clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
);
assertEquals(3, clusterNodes.size());
+ IgniteImpl metastorageNode = (IgniteImpl) clusterNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
+
return clusterNodes;
}
@@ -147,7 +122,7 @@ class ItDynamicTableCreationTest {
* Check dynamic table creation.
*/
@Test
- void testDynamicSimpleTableCreation() {
+ void testDynamicSimpleTableCreation() throws Exception {
startGrid();
// Create table on node 0.
@@ -175,8 +150,8 @@ class ItDynamicTableCreationTest {
RecordView<Tuple> recView2 = tbl2.recordView();
KeyValueView<Tuple, Tuple> kvView2 = tbl2.keyValueView();
- final Tuple keyTuple1 = Tuple.create().set("key", 1L);
- final Tuple keyTuple2 = Tuple.create().set("key", 2L);
+ Tuple keyTuple1 = Tuple.create().set("key", 1L);
+ Tuple keyTuple2 = Tuple.create().set("key", 2L);
assertThrows(IllegalArgumentException.class, () -> kvView2.get(null, keyTuple1).value("key"));
assertThrows(IllegalArgumentException.class, () -> kvView2.get(null, keyTuple2).value("key"));
@@ -196,7 +171,7 @@ class ItDynamicTableCreationTest {
* Check dynamic table creation.
*/
@Test
- void testDynamicTableCreation() {
+ void testDynamicTableCreation() throws Exception {
startGrid();
// Create table on node 0.
@@ -218,8 +193,8 @@ class ItDynamicTableCreationTest {
.changeReplicas(1)
.changePartitions(10));
- final UUID uuid = UUID.randomUUID();
- final UUID uuid2 = UUID.randomUUID();
+ UUID uuid = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
// Put data on node 1.
Table tbl1 = clusterNodes.get(1).tables().table(scmTbl1.canonicalName());
@@ -234,11 +209,11 @@ class ItDynamicTableCreationTest {
// Get data on node 2.
Table tbl2 = clusterNodes.get(2).tables().table(scmTbl1.canonicalName());
- final RecordView<Tuple> recView2 = tbl2.recordView();
+ RecordView<Tuple> recView2 = tbl2.recordView();
KeyValueView<Tuple, Tuple> kvView2 = tbl2.keyValueView();
- final Tuple keyTuple1 = Tuple.create().set("key", uuid).set("affKey", 42L);
- final Tuple keyTuple2 = Tuple.create().set("key", uuid2).set("affKey", 4242L);
+ Tuple keyTuple1 = Tuple.create().set("key", uuid).set("affKey", 42L);
+ Tuple keyTuple2 = Tuple.create().set("key", uuid2).set("affKey", 4242L);
// KV view must NOT return key columns in value.
assertThrows(IllegalArgumentException.class, () -> kvView2.get(null, keyTuple1).value("key"));
@@ -273,7 +248,7 @@ class ItDynamicTableCreationTest {
* Check unsupported column type change.
*/
@Test
- public void testChangeColumnType() {
+ public void testChangeColumnType() throws Exception {
List<Ignite> grid = startGrid();
assertTableCreationFailed(grid, c -> c.changeType(t -> t.changeType("UNKNOWN_TYPE")));
@@ -297,11 +272,11 @@ class ItDynamicTableCreationTest {
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15747")
@Test
- void testMissedPk() {
+ void testMissedPk() throws Exception {
List<Ignite> grid = startGrid();
// Missed PK.
- Assertions.assertThrows(ConfigurationValidationException.class, () -> {
+ assertThrows(ConfigurationValidationException.class, () -> {
try {
grid.get(0).tables().createTable(
"PUBLIC.tbl1",
@@ -321,7 +296,7 @@ class ItDynamicTableCreationTest {
});
//Missed affinity cols.
- Assertions.assertThrows(ConfigurationValidationException.class, () -> {
+ assertThrows(ConfigurationValidationException.class, () -> {
try {
grid.get(0).tables().createTable(
"PUBLIC.tbl1",
@@ -342,7 +317,7 @@ class ItDynamicTableCreationTest {
});
//Missed key cols.
- Assertions.assertThrows(ConfigurationValidationException.class, () -> {
+ assertThrows(ConfigurationValidationException.class, () -> {
try {
grid.get(0).tables().createTable(
"PUBLIC.tbl1",
@@ -370,7 +345,7 @@ class ItDynamicTableCreationTest {
* @param colChanger Column configuration changer.
*/
private void assertTableCreationFailed(List<Ignite> grid, Consumer<ColumnChange> colChanger) {
- Assertions.assertThrows(IgniteException.class, () -> {
+ assertThrows(IgniteException.class, () -> {
try {
grid.get(0).tables().createTable(
"PUBLIC.tbl1",
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a5b34fe..d5a630d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -153,9 +153,6 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
String nodeName = testNodeName(testInfo, 3344);
Ignite ignite = IgnitionManager.start(nodeName, "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ " + nodeName + " ]\n"
- + " },\n"
+ " \"network\": {\n"
+ " \"port\":3344,\n"
+ " \"nodeFinder\": {\n"
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgnitionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgnitionTest.java
index f10d512..9c23c76 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgnitionTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgnitionTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.runner.app;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -80,7 +79,6 @@ class ItIgnitionTest {
nodesBootstrapCfg.put(
node0Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[0] + ",\n"
+ " nodeFinder: {\n"
@@ -93,7 +91,6 @@ class ItIgnitionTest {
nodesBootstrapCfg.put(
node1Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[1] + ",\n"
+ " nodeFinder: {\n"
@@ -106,7 +103,6 @@ class ItIgnitionTest {
nodesBootstrapCfg.put(
node2Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[2] + ",\n"
+ " nodeFinder: {\n"
@@ -150,99 +146,6 @@ class ItIgnitionTest {
}
/**
- * Tests scenario when we try to start cluster with single node, but without any node, that hosts metastorage.
- */
- @Test
- void testErrorWhenStartSingleNodeClusterWithoutMetastorage() {
- try {
- startedNodes.add(IgnitionManager.start("other-name", "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\": [\n"
- + " \"node-0\", \"node-1\", \"node-2\"\n"
- + " ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\": 3344,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\": [ \"localhost:3344\"] \n"
- + " }\n"
- + " }\n"
- + "}", workDir.resolve("other-name")));
- } catch (Throwable th) {
- assertTrue(IgniteTestUtils.hasCause(th,
- IgniteException.class,
- "Cannot start meta storage manager because there is no node in the cluster that hosts meta storage."
- ));
- }
- }
-
- /**
- * Tests scenario when we try to start node that doesn't host metastorage in cluster with node, that hosts metastorage.
- */
- @Test
- void testStartNodeClusterWithoutMetastorage() throws Exception {
- Ignite ig1 = null;
-
- Ignite ig2 = null;
-
- try {
- ig1 = IgnitionManager.start("node-0", "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ \"node-0\" ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\": 3344,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\": [ \"localhost:3345\"]\n"
- + " }\n"
- + " }\n"
- + "}", workDir.resolve("node-0"));
-
- ig2 = IgnitionManager.start("other-name", "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ \"node-0\" ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\": 3345,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\": [ \"localhost:3344\"]\n"
- + " }\n"
- + " }\n"
- + "}", workDir.resolve("other-name"));
-
- assertEquals(ig2.name(), "other-name");
- } finally {
- IgniteUtils.closeAll(ig2, ig1);
- }
- }
-
- /**
- * Tests scenario when we try to start single-node cluster with several metastorage nodes in config.
- * TODO: test should be rewritten after init phase will be developed https://issues.apache.org/jira/browse/IGNITE-15114
- */
- @Test
- void testStartNodeClusterWithTwoMetastorageInConfig() throws Exception {
- try {
- IgnitionManager.start("node-0", "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\": [\n"
- + " \"node-0\", \"node-1\", \"node-2\"\n"
- + " ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\": 3344,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\": [ \"localhost:3345\"]\n"
- + " }\n"
- + " }\n"
- + "}", workDir.resolve("node-0"));
- } catch (IgniteException e) {
- assertEquals(e.getCause().getMessage(), "Cannot start meta storage manager "
- + "because it is not allowed to start several metastorage nodes.");
- }
- }
-
- /**
* Tests scenario when we try to start node with invalid configuration.
*/
@Test
@@ -272,7 +175,6 @@ class ItIgnitionTest {
String nodeName = "node-url-config";
String cfg = "{\n"
- + " node.metastorageNodes: [ \"" + nodeName + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[0] + "\n"
+ " }\n"
@@ -293,7 +195,7 @@ class ItIgnitionTest {
*/
private URL buildUrl(String path, String data) throws Exception {
URLStreamHandler handler = new URLStreamHandler() {
- private byte[] content = data.getBytes(StandardCharsets.UTF_8);
+ private final byte[] content = data.getBytes(StandardCharsets.UTF_8);
@Override
protected URLConnection openConnection(URL url) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java
index 5322e84..fd93d28 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java
@@ -21,19 +21,19 @@ import static java.util.stream.Collectors.joining;
import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.util.List;
import java.util.Set;
-import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.table.Table;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -42,25 +42,25 @@ import org.junit.jupiter.api.TestInfo;
*/
public class ItNoThreadsLeftTest extends IgniteAbstractTest {
/** Schema name. */
- public static final String SCHEMA = "PUBLIC";
+ private static final String SCHEMA = "PUBLIC";
/** Short table name. */
- public static final String SHORT_TABLE_NAME = "tbl1";
+ private static final String SHORT_TABLE_NAME = "tbl1";
- /** Table name. */
- public static final String TABLE_NAME = SCHEMA + "." + SHORT_TABLE_NAME;
+ private static final List<String> THREAD_NAMES_BLACKLIST = List.of(
+ "nioEventLoopGroup",
+ "globalEventExecutor",
+ "ForkJoinPool",
+ "process reaper",
+ "CompletableFutureDelayScheduler",
+ "parallel"
+ );
/** One node cluster configuration. */
- private static Function<String, String> NODE_CONFIGURATION = metastorageNodeName -> "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ " + metastorageNodeName + " ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\":3344,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\":[ \"localhost:3344\" ]\n"
- + " }\n"
- + " }\n"
+ private static final String NODE_CONFIGURATION =
+ "{\n"
+ + " network.port: 3344,\n"
+ + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\" ]\n"
+ "}";
/**
@@ -75,29 +75,34 @@ public class ItNoThreadsLeftTest extends IgniteAbstractTest {
String nodeName = IgniteTestUtils.testNodeName(testInfo, 0);
- Ignite ignite = IgnitionManager.start(
- nodeName,
- NODE_CONFIGURATION.apply(nodeName),
- workDir.resolve(nodeName));
+ try (IgniteImpl ignite = (IgniteImpl) IgnitionManager.start(nodeName, NODE_CONFIGURATION, workDir.resolve(nodeName))) {
+ ignite.init(List.of(ignite.name()));
- Table tbl = createTable(ignite, SCHEMA, SHORT_TABLE_NAME);
+ Table tbl = createTable(ignite, SCHEMA, SHORT_TABLE_NAME);
- assertNotNull(tbl);
+ assertNotNull(tbl);
+ }
- ignite.close();
+ boolean threadsKilled = waitForCondition(() -> threadsBefore.size() == getCurrentThreads().size(), 3_000);
- assertTrue(waitForCondition(() -> threadsBefore.size() == getCurrentThreads().size(), 3_000),
- getCurrentThreads().stream().filter(thread -> !threadsBefore.contains(thread)).map(Thread::getName).collect(joining(", ")));
+ if (!threadsKilled) {
+ String leakedThreadNames = getCurrentThreads().stream()
+ .filter(thread -> !threadsBefore.contains(thread))
+ .map(Thread::getName)
+ .collect(joining(", "));
+
+ fail(leakedThreadNames);
+ }
}
/**
* Creates a table.
*
- * @param node Cluster node.
- * @param schemaName Schema name.
+ * @param node Cluster node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
*/
- protected Table createTable(Ignite node, String schemaName, String shortTableName) {
+ private static Table createTable(Ignite node, String schemaName, String shortTableName) {
return node.tables().createTable(
schemaName + "." + shortTableName, tblCh -> convert(SchemaBuilders.tableBuilder(schemaName, shortTableName).columns(
SchemaBuilders.column("key", ColumnType.INT64).build(),
@@ -114,16 +119,9 @@ public class ItNoThreadsLeftTest extends IgniteAbstractTest {
*
* @return Set of threads.
*/
- @NotNull
- private Set<Thread> getCurrentThreads() {
+ private static Set<Thread> getCurrentThreads() {
return Thread.getAllStackTraces().keySet().stream()
- .filter(thread ->
- !thread.getName().startsWith("nioEventLoopGroup")
- && !thread.getName().startsWith("globalEventExecutor")
- && !thread.getName().startsWith("ForkJoinPool")
- && !thread.getName().startsWith("process reaper")
- && !thread.getName().startsWith("CompletableFutureDelayScheduler")
- && !thread.getName().startsWith("parallel"))
+ .filter(thread -> THREAD_NAMES_BLACKLIST.stream().noneMatch(thread.getName()::startsWith))
.collect(Collectors.toSet());
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java
index 8063405..9688b89 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java
@@ -41,7 +41,7 @@ class ItSchemaChangeKvViewTest extends AbstractSchemaChangeTest {
* Check add a new column to table schema.
*/
@Test
- public void testDropColumn() {
+ public void testDropColumn() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -85,7 +85,7 @@ class ItSchemaChangeKvViewTest extends AbstractSchemaChangeTest {
* Check drop column from table schema.
*/
@Test
- public void testAddNewColumn() {
+ public void testAddNewColumn() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -128,7 +128,7 @@ class ItSchemaChangeKvViewTest extends AbstractSchemaChangeTest {
* Check rename column from table schema.
*/
@Test
- public void testRenameColumn() {
+ public void testRenameColumn() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -179,7 +179,7 @@ class ItSchemaChangeKvViewTest extends AbstractSchemaChangeTest {
* Check merge table schema changes.
*/
@Test
- public void testMergeChangesAddDropAdd() {
+ public void testMergeChangesAddDropAdd() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -264,7 +264,7 @@ class ItSchemaChangeKvViewTest extends AbstractSchemaChangeTest {
* Check merge table schema changes.
*/
@Test
- public void testMergeChangesColumnDefault() {
+ public void testMergeChangesColumnDefault() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -309,7 +309,7 @@ class ItSchemaChangeKvViewTest extends AbstractSchemaChangeTest {
* Check an operation failed if an unknown column found.
*/
@Test
- public void testInsertRowOfDifferentSchema() {
+ public void testInsertRowOfDifferentSchema() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java
index 4bda027..1f52c13 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java
@@ -42,7 +42,7 @@ class ItSchemaChangeTableViewTest extends AbstractSchemaChangeTest {
* Check add a new column to table schema.
*/
@Test
- public void testDropColumn() {
+ public void testDropColumn() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -79,7 +79,7 @@ class ItSchemaChangeTableViewTest extends AbstractSchemaChangeTest {
* Check drop column from table schema.
*/
@Test
- public void testAddNewColumn() {
+ public void testAddNewColumn() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -116,7 +116,7 @@ class ItSchemaChangeTableViewTest extends AbstractSchemaChangeTest {
* Check column renaming.
*/
@Test
- void testRenameColumn() {
+ void testRenameColumn() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -157,7 +157,7 @@ class ItSchemaChangeTableViewTest extends AbstractSchemaChangeTest {
* Rename column then add a new column with same name.
*/
@Test
- void testRenameThenAddColumnWithSameName() {
+ void testRenameThenAddColumnWithSameName() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -198,7 +198,7 @@ class ItSchemaChangeTableViewTest extends AbstractSchemaChangeTest {
* Check merge table schema changes.
*/
@Test
- public void testMergeChangesAddDropAdd() {
+ public void testMergeChangesAddDropAdd() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -272,7 +272,7 @@ class ItSchemaChangeTableViewTest extends AbstractSchemaChangeTest {
* Check merge column default value changes.
*/
@Test
- public void testMergeChangesColumnDefault() {
+ public void testMergeChangesColumnDefault() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
@@ -317,7 +317,7 @@ class ItSchemaChangeTableViewTest extends AbstractSchemaChangeTest {
* Check operation failed if unknown column found.
*/
@Test
- public void testStrictSchemaInsertRowOfNewSchema() {
+ public void testStrictSchemaInsertRowOfNewSchema() throws Exception {
List<Ignite> grid = startGrid();
createTable(grid);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
index 9cc9733..6ac26a2 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
@@ -72,7 +72,6 @@ class ItTableCreationTest {
nodesBootstrapCfg.put(
node0Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\", \"" + node1Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[0] + ",\n"
+ " nodeFinder: {\n"
@@ -160,7 +159,6 @@ class ItTableCreationTest {
nodesBootstrapCfg.put(
node1Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\", \"" + node1Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[1] + ",\n"
+ " nodeFinder: {\n"
@@ -173,7 +171,6 @@ class ItTableCreationTest {
nodesBootstrapCfg.put(
node2Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\", \"" + node1Name + "\" ],\n"
+ " network: {\n"
+ " port: " + PORTS[2] + ",\n"
+ " nodeFinder: {\n"
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 9d896ec..7678a04 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.runner.app;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
import static org.apache.ignite.internal.test.WatchListenerInhibitor.metastorageEventsInhibitor;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -32,12 +34,12 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.internal.ItUtils;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
@@ -74,45 +76,22 @@ public class ItTablesApiTest extends IgniteAbstractTest {
public static final String TABLE_NAME = SCHEMA + "." + SHORT_TABLE_NAME;
/** Nodes bootstrap configuration. */
- private final ArrayList<Function<String, String>> nodesBootstrapCfg = new ArrayList<>() {
- {
- add((metastorageNodeName) -> "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ " + metastorageNodeName + " ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\":3344,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
- + " }\n"
- + " }\n"
- + "}");
-
- add((metastorageNodeName) -> "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ " + metastorageNodeName + " ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\":3345,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
- + " }\n"
- + " }\n"
- + "}");
-
- add((metastorageNodeName) -> "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ " + metastorageNodeName + " ]\n"
- + " },\n"
- + " \"network\": {\n"
- + " \"port\":3346,\n"
- + " \"nodeFinder\": {\n"
- + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
- + " }\n"
- + " }\n"
- + "}");
- }
- };
+ private final List<String> nodesBootstrapCfg = List.of(
+ "{\n"
+ + " network.port :3344,\n"
+ + " network.nodeFinder.netClusterNodes:[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + "}",
+
+ "{\n"
+ + " network.port :3345,\n"
+ + " network.nodeFinder.netClusterNodes:[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + "}",
+
+ "{\n"
+ + " network.port :3346,\n"
+ + " network.nodeFinder.netClusterNodes:[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + "}"
+ );
/** Cluster nodes. */
private List<Ignite> clusterNodes;
@@ -121,20 +100,23 @@ public class ItTablesApiTest extends IgniteAbstractTest {
* Before each.
*/
@BeforeEach
- void beforeEach(TestInfo testInfo) {
- String metastorageNodeName = IgniteTestUtils.testNodeName(testInfo, 0);
-
- clusterNodes = IntStream.range(0, nodesBootstrapCfg.size()).mapToObj(value -> {
+ void beforeEach(TestInfo testInfo) throws Exception {
+ clusterNodes = IntStream.range(0, nodesBootstrapCfg.size())
+ .mapToObj(value -> {
String nodeName = IgniteTestUtils.testNodeName(testInfo, value);
return IgnitionManager.start(
nodeName,
- nodesBootstrapCfg.get(value).apply(metastorageNodeName),
+ nodesBootstrapCfg.get(value),
// Avoid a long file path name (260 characters) for windows.
workDir.resolve(Integer.toString(value))
);
- }
- ).collect(Collectors.toList());
+ })
+ .collect(Collectors.toList());
+
+ IgniteImpl metastorageNode = (IgniteImpl) clusterNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
}
/**
@@ -181,9 +163,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
createTable(ignite0, SCHEMA, SHORT_TABLE_NAME);
- CompletableFuture createTblFut = CompletableFuture.runAsync(() -> createTable(ignite1, SCHEMA, SHORT_TABLE_NAME));
- CompletableFuture createTblIfNotExistsFut = CompletableFuture
- .supplyAsync(() -> createTableIfNotExists(ignite1, SCHEMA, SHORT_TABLE_NAME));
+ CompletableFuture<Void> createTblFut = runAsync(() -> createTable(ignite1, SCHEMA, SHORT_TABLE_NAME));
+ CompletableFuture<Table> createTblIfNotExistsFut = supplyAsync(() -> createTableIfNotExists(ignite1, SCHEMA, SHORT_TABLE_NAME));
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
@@ -256,9 +237,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
}
/**
- * Trys to create an index which is already created.
- *
- * @throws Exception If failed.
+ * Tries to create an index which is already created.
*/
@Test
public void testAddIndex() {
@@ -297,8 +276,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
addIndex(ignite0, SCHEMA, SHORT_TABLE_NAME);
- CompletableFuture addIndesFut = CompletableFuture.runAsync(() -> addIndex(ignite1, SCHEMA, SHORT_TABLE_NAME));
- CompletableFuture addIndesIfNotExistsFut = CompletableFuture.runAsync(() -> addIndexIfNotExists(ignite1, SCHEMA, SHORT_TABLE_NAME));
+ CompletableFuture<Void> addIndesFut = runAsync(() -> addIndex(ignite1, SCHEMA, SHORT_TABLE_NAME));
+ CompletableFuture<Void> addIndesIfNotExistsFut = runAsync(() -> addIndexIfNotExists(ignite1, SCHEMA, SHORT_TABLE_NAME));
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
@@ -365,8 +344,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
addColumn(ignite0, SCHEMA, SHORT_TABLE_NAME);
- CompletableFuture addColFut = CompletableFuture.runAsync(() -> addColumn(ignite1, SCHEMA, SHORT_TABLE_NAME));
- CompletableFuture addColIfNotExistsFut = CompletableFuture.runAsync(() -> addColumnIfNotExists(ignite1, SCHEMA, SHORT_TABLE_NAME));
+ CompletableFuture<Void> addColFut = runAsync(() -> addColumn(ignite1, SCHEMA, SHORT_TABLE_NAME));
+ CompletableFuture<Void> addColIfNotExistsFut = runAsync(() -> addColumnIfNotExists(ignite1, SCHEMA, SHORT_TABLE_NAME));
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
@@ -394,9 +373,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
}
/**
- * Checks that if a table would be created/dropped in any cluster node, this action reflects on all others.
- * Table management operations should pass in linearize order: if an action completed in one node,
- * the result has to be visible to another one.
+ * Checks that if a table would be created/dropped in any cluster node, this action reflects on all others. Table management operations
+ * should pass in linearize order: if an action completed in one node, the result has to be visible to another one.
*
* @throws Exception If failed.
*/
@@ -414,11 +392,9 @@ public class ItTablesApiTest extends IgniteAbstractTest {
UUID tblId = ((TableImpl) table).tableId();
- CompletableFuture<Table> tableByNameFut = CompletableFuture.supplyAsync(() -> {
- return ignite1.tables().table(TABLE_NAME);
- });
+ CompletableFuture<Table> tableByNameFut = supplyAsync(() -> ignite1.tables().table(TABLE_NAME));
- CompletableFuture<Table> tableByIdFut = CompletableFuture.supplyAsync(() -> {
+ CompletableFuture<Table> tableByIdFut = supplyAsync(() -> {
try {
return ((IgniteTablesInternal) ignite1.tables()).table(tblId);
} catch (NodeStoppingException e) {
@@ -466,8 +442,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
/**
* Creates table.
*
- * @param node Cluster node.
- * @param schemaName Schema name.
+ * @param node Cluster node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
*/
protected Table createTable(Ignite node, String schemaName, String shortTableName) {
@@ -486,8 +462,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
/**
* Adds an index if it does not exist.
*
- * @param node Cluster node.
- * @param schemaName Schema name.
+ * @param node Cluster node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
*/
protected Table createTableIfNotExists(Ignite node, String schemaName, String shortTableName) {
@@ -495,10 +471,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
return node.tables().createTable(
schemaName + "." + shortTableName,
tblCh -> convert(SchemaBuilders.tableBuilder(schemaName, shortTableName).columns(Arrays.asList(
- SchemaBuilders.column("key", ColumnType.INT64).build(),
- SchemaBuilders.column("valInt", ColumnType.INT32).asNullable(true).build(),
- SchemaBuilders.column("valStr", ColumnType.string())
- .withDefaultValueExpression("default").build()
+ SchemaBuilders.column("key", ColumnType.INT64).build(),
+ SchemaBuilders.column("valInt", ColumnType.INT32).asNullable(true).build(),
+ SchemaBuilders.column("valStr", ColumnType.string())
+ .withDefaultValueExpression("default").build()
)).withPrimaryKey("key").build(),
tblCh).changeReplicas(2).changePartitions(10)
);
@@ -508,8 +484,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
}
/**
- * Drops the table which name is specified.
- * If the table does not exist, an exception will be thrown.
+ * Drops the table which name is specified. If the table does not exist, an exception will be thrown.
*
* @param node Cluster node.
* @param schemaName Schema name.
@@ -520,8 +495,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
}
/**
- * Drops the table which name is specified.
- * If the table did not exist, a dropping would ignore.
+ * Drops the table which name is specified. If the table did not exist, a dropping would ignore.
*
* @param node Cluster node.
* @param schemaName Schema name.
@@ -538,8 +512,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
/**
* Adds an index.
*
- * @param node Cluster node.
- * @param schemaName Schema name.
+ * @param node Cluster node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
*/
protected void addColumn(Ignite node, String schemaName, String shortTableName) {
@@ -552,10 +526,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
/**
* Adds a column according to the column definition.
*
- * @param node Ignite node.
- * @param schemaName Schema name.
+ * @param node Ignite node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
- * @param colDefinition Column defenition.
+ * @param colDefinition Column defenition.
*/
private void addColumnInternal(Ignite node, String schemaName, String shortTableName, ColumnDefinition colDefinition) {
node.tables().alterTable(
@@ -572,8 +546,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
/**
* Adds a column if it does not exist.
*
- * @param node Ignite node.
- * @param schemaName Schema name.
+ * @param node Ignite node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
*/
protected void addColumnIfNotExists(Ignite node, String schemaName, String shortTableName) {
@@ -590,8 +564,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
/**
* Adds a column.
*
- * @param node Cluster node.
- * @param schemaName Schema name.
+ * @param node Cluster node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
*/
protected void addIndex(Ignite node, String schemaName, String shortTableName) {
@@ -613,8 +587,8 @@ public class ItTablesApiTest extends IgniteAbstractTest {
/**
* Creates a table if it does not exist.
*
- * @param node Cluster node.
- * @param schemaName Schema name.
+ * @param node Cluster node.
+ * @param schemaName Schema name.
* @param shortTableName Table name.
*/
protected void addIndexIfNotExists(Ignite node, String schemaName, String shortTableName) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 96c38c4..414c248 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.runner.app;
-import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -49,12 +47,8 @@ public class PlatformTestNodeRunner {
private static final int RUN_TIME_MINUTES = 30;
/** Nodes bootstrap configuration. */
- private static final Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {
- {
- put(NODE_NAME, "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ \"" + NODE_NAME + "\" ]\n"
- + " },\n"
+ private static final Map<String, String> nodesBootstrapCfg = Map.of(
+ NODE_NAME, "{\n"
+ " \"clientConnector\":{\"port\": 10942,\"portRange\":10},"
+ " \"network\": {\n"
+ " \"port\":3344,\n"
@@ -62,9 +56,8 @@ public class PlatformTestNodeRunner {
+ " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\" ]\n"
+ " }\n"
+ " }\n"
- + "}");
- }
- };
+ + "}"
+ );
/** Base path for all temporary folders. */
private static final Path BASE_PATH = Path.of("target", "work", "PlatformTestNodeRunner");
@@ -95,6 +88,10 @@ public class PlatformTestNodeRunner {
startedNodes.add(IgnitionManager.start(nodeName, configStr, BASE_PATH.resolve(nodeName)))
);
+ IgniteImpl metastorageNode = (IgniteImpl) startedNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
+
var keyCol = "key";
var valCol = "val";
@@ -103,7 +100,7 @@ public class PlatformTestNodeRunner {
SchemaBuilders.column(valCol, ColumnType.string()).asNullable(true).build()
).withPrimaryKey(keyCol).build();
- startedNodes.get(0).tables().createTable(schTbl.canonicalName(), tblCh ->
+ metastorageNode.tables().createTable(schTbl.canonicalName(), tblCh ->
SchemaConfigurationConverter.convert(schTbl, tblCh)
.changeReplicas(1)
.changePartitions(10)
@@ -127,6 +124,6 @@ public class PlatformTestNodeRunner {
* @return Port number.
*/
private static int getPort(IgniteImpl node) {
- return ((InetSocketAddress) node.clientHandlerModule().localAddress()).getPort();
+ return node.clientAddress().port();
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index cf2cc80..a1d4cbe 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.runner.app.client;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -70,7 +68,7 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
* Before each.
*/
@BeforeAll
- void beforeAll(TestInfo testInfo, @WorkDirectory Path workDir) {
+ void beforeAll(TestInfo testInfo, @WorkDirectory Path workDir) throws Exception {
this.workDir = workDir;
String node0Name = testNodeName(testInfo, 3344);
@@ -79,26 +77,16 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
nodesBootstrapCfg.put(
node0Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
- + " network: {\n"
- + " port: " + 3344 + ",\n"
- + " nodeFinder: {\n"
- + " netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
- + " }\n"
- + " }\n"
+ + " network.port: 3344,\n"
+ + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
+ "}"
);
nodesBootstrapCfg.put(
node1Name,
"{\n"
- + " node.metastorageNodes: [ \"" + node0Name + "\" ],\n"
- + " network: {\n"
- + " port: " + 3345 + ",\n"
- + " nodeFinder: {\n"
- + " netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
- + " }\n"
- + " }\n"
+ + " network.port: 3345,\n"
+ + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
+ "}"
);
@@ -106,6 +94,10 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
startedNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
);
+ IgniteImpl metastorageNode = (IgniteImpl) startedNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
+
TableDefinition schTbl = SchemaBuilders.tableBuilder(SCHEMA_NAME, TABLE_NAME).columns(
SchemaBuilders.column(COLUMN_KEY, ColumnType.INT32).build(),
SchemaBuilders.column(COLUMN_VAL, ColumnType.string()).asNullable(true).build()
@@ -142,8 +134,7 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
List<String> res = new ArrayList<>(startedNodes.size());
for (Ignite ignite : startedNodes) {
- SocketAddress addr = ((IgniteImpl) ignite).clientHandlerModule().localAddress();
- int port = ((InetSocketAddress) addr).getPort();
+ int port = ((IgniteImpl) ignite).clientAddress().port();
res.add("127.0.0.1:" + port);
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/AbstractJdbcSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/AbstractJdbcSelfTest.java
index 9e1ba8c..4117067 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/AbstractJdbcSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/AbstractJdbcSelfTest.java
@@ -31,6 +31,9 @@ import java.util.List;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.junit.jupiter.api.AfterAll;
@@ -38,12 +41,13 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
-import org.junit.jupiter.api.io.TempDir;
/**
* Abstract jdbc self test.
*/
+@ExtendWith(WorkDirectoryExtension.class)
public class AbstractJdbcSelfTest {
/** URL. */
protected static final String URL = "jdbc:ignite:thin://127.0.0.1:10800";
@@ -63,15 +67,17 @@ public class AbstractJdbcSelfTest {
/**
* Creates a cluster of three nodes.
*
- * @param temp Temporal directory.
+ * @param temp Temporary directory.
*/
@BeforeAll
- public static void beforeAll(@TempDir Path temp, TestInfo testInfo) throws SQLException {
+ public static void beforeAll(@WorkDirectory Path temp, TestInfo testInfo) throws Exception {
String nodeName = testNodeName(testInfo, 47500);
- String configStr = "node.metastorageNodes: [ \"" + nodeName + "\" ]";
+ clusterNodes.add(IgnitionManager.start(nodeName, null, temp.resolve(nodeName)));
- clusterNodes.add(IgnitionManager.start(nodeName, configStr, temp.resolve(nodeName)));
+ IgniteImpl metastorageNode = (IgniteImpl) clusterNodes.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
conn = DriverManager.getConnection(URL);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
index cbd117b..f3ac55e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
@@ -65,7 +65,7 @@ public class ItJdbcBatchSelfTest extends AbstractJdbcSelfTest {
private PreparedStatement pstmt;
@BeforeAll
- public static void beforeAll(@TempDir Path temp, TestInfo testInfo) throws SQLException {
+ public static void beforeAll(@TempDir Path temp, TestInfo testInfo) throws Exception {
AbstractJdbcSelfTest.beforeAll(temp, testInfo);
try (Statement statement = conn.createStatement()) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
index 26a3640..22572ba 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
@@ -64,9 +64,6 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
/** Nodes bootstrap configuration pattern. */
private static final String NODE_BOOTSTRAP_CFG = "{\n"
- + " \"node\": {\n"
- + " \"metastorageNodes\":[ {} ]\n"
- + " },\n"
+ " \"network\": {\n"
+ " \"port\":{},\n"
+ " \"nodeFinder\":{\n"
@@ -85,24 +82,23 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
/**
* Before all.
*
- * @param testInfo Test information oject.
+ * @param testInfo Test information object.
*/
@BeforeAll
- void startNodes(TestInfo testInfo) {
- //TODO: IGNITE-16034 Here we assume that Metastore consists into one node, and it starts at first.
- String metastorageNodes = '\"' + IgniteTestUtils.testNodeName(testInfo, 0) + '\"';
-
+ void startNodes(TestInfo testInfo) throws Exception {
String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
for (int i = 0; i < nodes(); i++) {
String curNodeName = IgniteTestUtils.testNodeName(testInfo, i);
- CLUSTER_NODES.add(IgnitionManager.start(curNodeName, IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG,
- metastorageNodes,
- BASE_PORT + i,
- connectNodeAddr
- ), WORK_DIR.resolve(curNodeName)));
+ String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + i, connectNodeAddr);
+
+ CLUSTER_NODES.add(IgnitionManager.start(curNodeName, config, WORK_DIR.resolve(curNodeName)));
}
+
+ IgniteImpl metastorageNode = (IgniteImpl) CLUSTER_NODES.get(0);
+
+ metastorageNode.init(List.of(metastorageNode.name()));
}
/**
diff --git a/modules/runner/src/integrationTest/resources/ignite-config.json b/modules/runner/src/integrationTest/resources/ignite-config.json
index 6a2ca12..8698d5a 100644
--- a/modules/runner/src/integrationTest/resources/ignite-config.json
+++ b/modules/runner/src/integrationTest/resources/ignite-config.json
@@ -1,9 +1,4 @@
{
- "node": {
- "metastorageNodes": [
- "node1"
- ]
- },
"network": {
"port": 3344,
"portRange": 10,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index dd132e5..4b29ee5 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
@@ -35,13 +36,14 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.messages.CmgMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModule;
import org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
import org.apache.ignite.internal.configuration.rest.ConfigurationHttpHandlers;
-import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorage;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -69,10 +71,12 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NettyBootstrapFactory;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
+import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -137,6 +141,8 @@ public class IgniteImpl implements Ignite {
/** Rest module. */
private final RestComponent restComponent;
+ private final ClusterManagementGroupManager cmgMgr;
+
/** Client handler module. */
private final ClientHandlerModule clientHandlerModule;
@@ -146,17 +152,12 @@ public class IgniteImpl implements Ignite {
/**
* The Constructor.
*
- * @param name Ignite node name.
- * @param workDir Work directory for the started node. Must not be {@code null}.
- * @param serviceProviderClassLoader The class loader to be used to load provider-configuration files and provider
- * classes, or {@code null} if the system class loader (or, failing that
- * the bootstrap class loader) is to be used
+ * @param name Ignite node name.
+ * @param workDir Work directory for the started node. Must not be {@code null}.
+ * @param serviceProviderClassLoader The class loader to be used to load provider-configuration files and provider classes, or
+ * {@code null} if the system class loader (or, failing that the bootstrap class loader) is to be used.
*/
- IgniteImpl(
- String name,
- Path workDir,
- ClassLoader serviceProviderClassLoader
- ) {
+ IgniteImpl(String name, Path workDir, ClassLoader serviceProviderClassLoader) {
this.name = name;
vaultMgr = createVault(workDir);
@@ -174,6 +175,8 @@ public class IgniteImpl implements Ignite {
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
MessageSerializationRegistryImpl serializationRegistry = new MessageSerializationRegistryImpl();
+
+ CmgMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
SqlQueryMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
@@ -194,26 +197,24 @@ public class IgniteImpl implements Ignite {
txManager = new TableTxManagerImpl(clusterSvc, new HeapLockManager());
+ cmgMgr = new ClusterManagementGroupManager(clusterSvc, raftMgr, restComponent);
+
metaStorageMgr = new MetaStorageManager(
vaultMgr,
- nodeCfgMgr,
clusterSvc,
+ cmgMgr,
raftMgr,
new RocksDbKeyValueStorage(workDir.resolve(METASTORAGE_DB_PATH))
);
- ConfigurationStorage cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vaultMgr);
-
clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
modules.distributed().validators(),
- cfgStorage,
+ new DistributedConfigurationStorage(metaStorageMgr, vaultMgr),
modules.distributed().internalSchemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
- new ConfigurationHttpHandlers(nodeCfgMgr, clusterCfgMgr).registerHandlers(restComponent);
-
baselineMgr = new BaselineManager(
clusterCfgMgr,
metaStorageMgr,
@@ -252,9 +253,11 @@ public class IgniteImpl implements Ignite {
nodeCfgMgr.configurationRegistry(),
nettyBootstrapFactory
);
+
+ new ConfigurationHttpHandlers(nodeCfgMgr, clusterCfgMgr).registerHandlers(restComponent);
}
- private ConfigurationModules loadConfigurationModules(ClassLoader classLoader) {
+ private static ConfigurationModules loadConfigurationModules(ClassLoader classLoader) {
var modulesProvider = new ServiceLoaderModulesProvider();
List<ConfigurationModule> modules = modulesProvider.modules(classLoader);
@@ -280,22 +283,21 @@ public class IgniteImpl implements Ignite {
/**
* Starts ignite node.
*
- * @param cfg Optional node configuration based on {@link org.apache.ignite.configuration.schemas.runner.NodeConfigurationSchema} and
- * {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}. Following rules are used for applying
- * the configuration properties:
- * <ol>
- * <li>Specified property overrides existing one or just applies itself if it wasn't
- * previously specified.</li>
- * <li>All non-specified properties either use previous value or use default one from
- * corresponding configuration schema.</li>
- * </ol>
- * So that, in case of initial node start (first start ever) specified configuration, supplemented with defaults, is
- * used. If no configuration was provided defaults are used for all configuration properties. In case of node
- * restart, specified properties override existing ones, non specified properties that also weren't specified
- * previously use default values. Please pay attention that previously specified properties are searched in the
- * {@code workDir} specified by the user.
+ * @param cfg Optional node configuration based on {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}.
+ * Following rules are used for applying the configuration properties:
+ * <ol>
+ * <li>Specified property overrides existing one or just applies itself if it wasn't
+ * previously specified.</li>
+ * <li>All non-specified properties either use previous value or use default one from
+ * corresponding configuration schema.</li>
+ * </ol>
+ * So that, in case of initial node start (first start ever) specified configuration, supplemented with defaults, is
+ * used. If no configuration was provided defaults are used for all configuration properties. In case of node
+ * restart, specified properties override existing ones, non specified properties that also weren't specified
+ * previously use default values. Please pay attention that previously specified properties are searched in the
+ * {@code workDir} specified by the user.
*/
- public void start(@Nullable String cfg) {
+ public void start(@Language("HOCON") @Nullable String cfg) {
List<IgniteComponent> startedComponents = new ArrayList<>();
try {
@@ -309,10 +311,7 @@ public class IgniteImpl implements Ignite {
vaultMgr.putName(name).join();
// Node configuration manager startup.
- doStartComponent(
- name,
- startedComponents,
- nodeCfgMgr);
+ doStartComponent(name, startedComponents, nodeCfgMgr);
// Node configuration manager bootstrap.
if (cfg != null) {
@@ -329,14 +328,15 @@ public class IgniteImpl implements Ignite {
List<IgniteComponent> otherComponents = List.of(
nettyBootstrapFactory,
clusterSvc,
+ restComponent,
raftMgr,
txManager,
- metaStorageMgr,
clusterCfgMgr,
+ cmgMgr,
+ metaStorageMgr,
baselineMgr,
distributedTblMgr,
qryEngine,
- restComponent,
clientHandlerModule
);
@@ -368,8 +368,8 @@ public class IgniteImpl implements Ignite {
*/
public void stop() {
if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
- doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, baselineMgr,
- distributedTblMgr, qryEngine, restComponent, clientHandlerModule, nettyBootstrapFactory));
+ doStopNode(List.of(nettyBootstrapFactory, vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, clusterCfgMgr,
+ cmgMgr, metaStorageMgr, baselineMgr, distributedTblMgr, qryEngine, restComponent, clientHandlerModule));
}
}
@@ -426,27 +426,62 @@ public class IgniteImpl implements Ignite {
}
/**
- * Returns client handler module.
+ * Returns the id of the current node.
*/
- public ClientHandlerModule clientHandlerModule() {
- return clientHandlerModule;
+ // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131
+ public String id() {
+ return clusterSvc.topologyService().localMember().id();
}
/**
- * Returns the id of the current node.
+ * Returns the local address of REST endpoints.
+ *
+ * @throws IgniteInternalException if the REST module is not started.
*/
- public String id() {
- return clusterSvc.topologyService().localMember().id();
+ // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131
+ public NetworkAddress restAddress() {
+ return NetworkAddress.from(restComponent.localAddress());
+ }
+
+ /**
+ * Returns the local address of the Thin Client.
+ *
+ * @throws IgniteInternalException if the Client module is not started.
+ */
+ // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131
+ public NetworkAddress clientAddress() {
+ return NetworkAddress.from(clientHandlerModule.localAddress());
+ }
+
+ /**
+ * Initializes the cluster that this node is present in.
+ *
+ * @param metaStorageNodeNames names of nodes that will host the Meta Storage and the CMG.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ public void init(Collection<String> metaStorageNodeNames) throws NodeStoppingException {
+ init(metaStorageNodeNames, List.of());
+ }
+
+ /**
+ * Initializes the cluster that this node is present in.
+ *
+ * @param metaStorageNodeNames names of nodes that will host the Meta Storage.
+ * @param cmgNodeNames names of nodes that will host the CMG.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ public void init(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames) throws NodeStoppingException {
+ cmgMgr.initCluster(metaStorageNodeNames, cmgNodeNames);
}
/**
* Checks node status. If it's {@link Status#STOPPING} then prevents further starting and throws NodeStoppingException that will lead to
* stopping already started components later on, otherwise starts component and add it to started components list.
*
- * @param nodeName Node name.
+ * @param nodeName Node name.
* @param startedComponents List of already started components for given node.
- * @param component Ignite component to start.
- * @param <T> Ignite component type.
+ * @param component Ignite component to start.
+ * @param <T> Ignite component type.
* @throws NodeStoppingException If node stopping intention was detected.
*/
private <T extends IgniteComponent> void doStartComponent(
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModule.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModule.java
index fb97540..185940f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModule.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModule.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.annotation.ConfigurationType;
-import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
@@ -45,7 +44,6 @@ public class CoreDistributedConfigurationModule implements ConfigurationModule {
@Override
public Collection<RootKey<?, ?>> rootKeys() {
return List.of(
- ClusterConfiguration.KEY,
TablesConfiguration.KEY,
DataStorageConfiguration.KEY
);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
index 63a8f7d..73e4a15 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java
@@ -24,7 +24,6 @@ import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
/**
* {@link ConfigurationModule} for node-local configuration provided by ignite-api.
@@ -41,7 +40,6 @@ public class CoreLocalConfigurationModule implements ConfigurationModule {
public Collection<RootKey<?, ?>> rootKeys() {
return List.of(
NetworkConfiguration.KEY,
- NodeConfiguration.KEY,
RestConfiguration.KEY,
ClientConnectorConfiguration.KEY
);
diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModuleTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModuleTest.java
index 0672b12..53bae42 100644
--- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModuleTest.java
+++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreDistributedConfigurationModuleTest.java
@@ -28,7 +28,6 @@ import static org.hamcrest.Matchers.is;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;
-import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
@@ -48,11 +47,6 @@ class CoreDistributedConfigurationModuleTest {
}
@Test
- void hasClusterConfigurationRoot() {
- assertThat(module.rootKeys(), hasItem(ClusterConfiguration.KEY));
- }
-
- @Test
void hasTablesConfigurationRoot() {
assertThat(module.rootKeys(), hasItem(TablesConfiguration.KEY));
}
diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
index 21f2796..20b3102 100644
--- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
+++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java
@@ -31,7 +31,6 @@ import java.util.ServiceLoader.Provider;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.junit.jupiter.api.Test;
/**
@@ -51,11 +50,6 @@ class CoreLocalConfigurationModuleTest {
}
@Test
- void hasNodeConfigurationRoot() {
- assertThat(module.rootKeys(), hasItem(NodeConfiguration.KEY));
- }
-
- @Test
void hasRestConfigurationRoot() {
assertThat(module.rootKeys(), hasItem(RestConfiguration.KEY));
}
diff --git a/parent/pom.xml b/parent/pom.xml
index 42e5faf..c3e6f56 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -112,6 +112,18 @@
<maven.shade.plugin.version>3.2.4</maven.shade.plugin.version>
<maven.source.plugin.version>3.2.1</maven.source.plugin.version>
<maven.surefire.plugin.version>3.0.0-M5</maven.surefire.plugin.version>
+
+ <argLine>
+ --add-opens java.base/java.lang=ALL-UNNAMED
+ --add-opens java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens java.base/java.io=ALL-UNNAMED
+ --add-opens java.base/java.nio=ALL-UNNAMED
+ --add-opens java.base/java.util=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
+ -Dio.netty.tryReflectionSetAccessible=true
+ -Djava.util.logging.config.file=../../config/java.util.logging.properties
+ </argLine>
</properties>
<distributionManagement>
@@ -191,6 +203,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-cluster-management</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-metastorage</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1081,17 +1099,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <argLine>
- --add-opens java.base/java.lang=ALL-UNNAMED
- --add-opens java.base/java.lang.invoke=ALL-UNNAMED
- --add-opens java.base/java.lang.reflect=ALL-UNNAMED
- --add-opens java.base/java.io=ALL-UNNAMED
- --add-opens java.base/java.nio=ALL-UNNAMED
- --add-opens java.base/java.util=ALL-UNNAMED
- --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
- -Dio.netty.tryReflectionSetAccessible=true
- -Djava.util.logging.config.file=../../config/java.util.logging.properties
- </argLine>
<excludes>
<exclude>%regex[.*(It)[A-Z].*]</exclude>
<!-- Exclude inner classes (preserve default behaviour) -->
@@ -1107,17 +1114,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
- <argLine>
- --add-opens java.base/java.lang=ALL-UNNAMED
- --add-opens java.base/java.lang.invoke=ALL-UNNAMED
- --add-opens java.base/java.lang.reflect=ALL-UNNAMED
- --add-opens java.base/java.io=ALL-UNNAMED
- --add-opens java.base/java.nio=ALL-UNNAMED
- --add-opens java.base/java.util=ALL-UNNAMED
- --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
- -Dio.netty.tryReflectionSetAccessible=true
- -Djava.util.logging.config.file=../../config/java.util.logging.properties
- </argLine>
+
<useModulePath>false</useModulePath>
<includes>
<include>%regex[.*(It)[A-Z].*]</include>
diff --git a/pom.xml b/pom.xml
index 07b7974..2e39863 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,7 @@
<module>modules/client</module>
<module>modules/client-common</module>
<module>modules/client-handler</module>
+ <module>modules/cluster-management</module>
<module>modules/configuration</module>
<module>modules/configuration-annotation-processor</module>
<module>modules/configuration-api</module>