You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/08/23 07:31:00 UTC

[GitHub] [ignite-3] SammyVimes commented on a change in pull request #293: IGNITE-15278

SammyVimes commented on a change in pull request #293:
URL: https://github.com/apache/ignite-3/pull/293#discussion_r693722615



##########
File path: modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
##########
@@ -181,8 +179,18 @@ public void beforeTest() {
         var nodeFinder = new LocalPortRangeNodeFinder(NODE_PORT_BASE, NODE_PORT_BASE + NODES);
 
         nodeFinder.findNodes().stream()
-            .map(addr -> startClusterNode(addr, nodeFinder))
-            .forEach(cluster::add);
+            .map(addr -> ClusterServiceTestUtils.clusterService(
+                addr.toString(),

Review comment:
       Not sure, but seems like indentation is missing

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -17,51 +17,310 @@
 
 package org.apache.ignite.internal.app;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.client.handler.ClientHandlerModule;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
+import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
+import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.processors.query.calcite.SqlQueryProcessor;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
+import org.apache.ignite.internal.storage.LocalConfigurationStorage;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
+import org.apache.ignite.rest.RestModule;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite internal implementation.
  */
 public class IgniteImpl implements Ignite {
-    /** Distributed table manager. */
-    private final IgniteTables distributedTblMgr;
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteImpl.class);
+
+    /**
+     * Path to the persistent storage used by the {@link org.apache.ignite.internal.vault.VaultService} component.
+     */
+    private static final Path VAULT_DB_PATH = Paths.get("vault");
+
+    /**
+     * Path for the partitions persistent storage.
+     */
+    private static final Path PARTITIONS_STORE_PATH = Paths.get("db");
 
     /** Ignite node name. */
     private final String name;
 
-    private final SqlQueryProcessor qryEngine;
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
 
     /** Configuration manager that handles node (local) configuration. */
-    private final ConfigurationManager nodeConfigurationMgr;
+    private final ConfigurationManager nodeCfgMgr;
+
+    /** Cluster service (cluster network manager). */
+    private final ClusterService clusterSvc;
+
+    /** Raft manager. */
+    private final Loza raftMgr;
+
+    /** Meta storage manager. */
+    private final MetaStorageManager metaStorageMgr;
 
     /** Configuration manager that handles cluster (distributed) configuration. */
-    private final ConfigurationManager clusterConfigurationMgr;
+    private final ConfigurationManager clusterCfgMgr;
+
+    /** Baseline manager. */
+    private final BaselineManager baselineMgr;
+
+    /** Affinity manager. */
+    private final AffinityManager affinityMgr;
+
+    /** Schema manager. */
+    private final SchemaManager schemaMgr;
+
+    /** Distributed table manager. */
+    private final TableManager distributedTblMgr;
+
+    /** Query engine. */
+    private final SqlQueryProcessor qryEngine;
+
+    /** Rest module. */
+    private final RestModule restModule;
+
+    /** Client handler module. */
+    private final ClientHandlerModule clientHandlerModule;
+
+    /** Node status. Adds ability to stop currently starting node. */
+    private final AtomicReference<Status> status = new AtomicReference<>(Status.STARTING);
 
     /**
+     * The Constructor.
+     *
      * @param name Ignite node name.
-     * @param tblMgr Table manager.
-     * @param qryEngine Query processor.
-     * @param nodeConfigurationMgr Configuration manager that handles node (local) configuration.
-     * @param clusterConfigurationMgr Configuration manager that handles cluster (distributed) configuration.
+     * @param workDir Work directory for the started node. Must not be {@code null}.
      */
     IgniteImpl(
         String name,
-        IgniteTables tblMgr,
-        SqlQueryProcessor qryEngine,
-        ConfigurationManager nodeConfigurationMgr,
-        ConfigurationManager clusterConfigurationMgr
+        Path workDir
     ) {
         this.name = name;
-        this.distributedTblMgr = tblMgr;
-        this.qryEngine = qryEngine;
-        this.nodeConfigurationMgr = nodeConfigurationMgr;
-        this.clusterConfigurationMgr = clusterConfigurationMgr;
+
+        vaultMgr = createVault(workDir);
+
+        nodeCfgMgr = new ConfigurationManager(
+            Arrays.asList(
+                NetworkConfiguration.KEY,
+                NodeConfiguration.KEY,
+                RestConfiguration.KEY,
+                ClientConnectorConfiguration.KEY
+            ),
+            Map.of(),
+            new LocalConfigurationStorage(vaultMgr)
+        );
+
+        clusterSvc = new ScaleCubeClusterServiceFactory().createClusterService(
+            new ClusterLocalConfiguration(
+                name,
+                new MessageSerializationRegistryImpl()
+            ),
+            nodeCfgMgr,
+            () -> StaticNodeFinder.fromConfiguration(nodeCfgMgr.configurationRegistry().
+                getConfiguration(NetworkConfiguration.KEY).value())
+        );
+
+        raftMgr = new Loza(clusterSvc, workDir);
+
+        metaStorageMgr = new MetaStorageManager(
+            vaultMgr,
+            nodeCfgMgr,
+            clusterSvc,
+            raftMgr
+        );
+
+        clusterCfgMgr = new ConfigurationManager(
+            Arrays.asList(
+                ClusterConfiguration.KEY,
+                TablesConfiguration.KEY
+            ),
+            Map.of(),
+            new DistributedConfigurationStorage(metaStorageMgr, vaultMgr)
+        );
+
+        baselineMgr = new BaselineManager(
+            clusterCfgMgr,
+            metaStorageMgr,
+            clusterSvc
+        );
+
+        affinityMgr = new AffinityManager(
+            clusterCfgMgr,
+            metaStorageMgr,
+            baselineMgr
+        );
+
+        schemaMgr = new SchemaManager(
+            clusterCfgMgr,
+            metaStorageMgr,
+            vaultMgr
+        );
+
+        distributedTblMgr = new TableManager(
+            nodeCfgMgr,
+            clusterCfgMgr,
+            metaStorageMgr,
+            schemaMgr,
+            affinityMgr,
+            raftMgr,
+            getPartitionsStorePath(workDir)
+        );
+
+        qryEngine = new SqlQueryProcessor(
+            clusterSvc,
+            distributedTblMgr
+        );
+
+        restModule = new RestModule(nodeCfgMgr, clusterCfgMgr);
+
+        clientHandlerModule = new ClientHandlerModule(distributedTblMgr, nodeCfgMgr.configurationRegistry());
+    }
+
+    /**
+     * Starts ignite node.
+     *
+     * @param cfg Optional node configuration based on {@link org.apache.ignite.configuration.schemas.runner.NodeConfigurationSchema}
+     * and {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}. Following rules are used
+     * for applying the configuration properties:
+     * <ol>
+     * <li>Specified property overrides existing one or just applies itself if it wasn't
+     * previously specified.</li>
+     * <li>All non-specified properties either use previous value or use default one from
+     * corresponding configuration schema.</li>
+     * </ol>
+     * So that, in case of initial node start (first start ever) specified configuration, supplemented with defaults, is
+     * used. If no configuration was provided defaults are used for all configuration properties. In case of node
+     * restart, specified properties override existing ones, non specified properties that also weren't specified
+     * previously use default values. Please pay attention that previously specified properties are searched in the
+     * {@code workDir} specified by the user.
+     */
+    public void start(@Nullable String cfg) {
+        List<IgniteComponent> startedComponents = new ArrayList<>();
+
+        try {
+            // Vault startup.
+            doStartComponent(
+                name,
+                startedComponents,
+                vaultMgr
+            );
+
+            vaultMgr.putName(name).join();
+
+            // Node configuration manager startup.
+            doStartComponent(
+                name,
+                startedComponents,
+                nodeCfgMgr);
+
+            // Node configuration manager bootstrap.
+            if (cfg != null) {
+                try {
+                    nodeCfgMgr.bootstrap(cfg);
+                }
+                catch (Exception e) {
+                    LOG.warn("Unable to parse user-specific configuration, default configuration will be used: {}",
+                        e.getMessage());
+                }
+            }
+            else
+                nodeCfgMgr.configurationRegistry().initializeDefaults();
+
+            // Start the remaining components.
+            List<IgniteComponent> otherComponents = List.of(
+                clusterSvc,
+                raftMgr,
+                metaStorageMgr,
+                clusterCfgMgr,
+                baselineMgr,
+                affinityMgr,
+                schemaMgr,
+                distributedTblMgr,
+                qryEngine,
+                restModule,
+                clientHandlerModule
+            );
+
+            for (IgniteComponent component : otherComponents)
+                doStartComponent(name, startedComponents, component);
+
+            // Deploy all resisted watches cause all components are ready and have registered their listeners.
+            metaStorageMgr.deployWatches();
+
+            if (!status.compareAndSet(Status.STARTING, Status.STARTED))
+                throw new NodeStoppingException();
+        }
+        catch (Exception e) {
+            String errMsg = "Unable to start node=[" + name + "].";
+
+            LOG.error(errMsg, e);
+
+            doStopNode(startedComponents);
+
+            throw new IgniteException(errMsg, e);
+        }
+    }
+
+    /**
+     * Stops ignite node.
+     */
+    public void stop() {
+        AtomicBoolean explicitStop = new AtomicBoolean();
+
+        status.getAndUpdate(status -> {
+            if (status == Status.STARTED)
+                explicitStop.set(true);

Review comment:
       I think there should be an `else` clause with `explicitStop.set(false)`, because this closure could be called multiple times (if stop is called from different threads) and `doStopNode` will be called multiple times

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -17,51 +17,310 @@
 
 package org.apache.ignite.internal.app;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.client.handler.ClientHandlerModule;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
+import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
+import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.processors.query.calcite.SqlQueryProcessor;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
+import org.apache.ignite.internal.storage.LocalConfigurationStorage;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
+import org.apache.ignite.rest.RestModule;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite internal implementation.
  */
 public class IgniteImpl implements Ignite {
-    /** Distributed table manager. */
-    private final IgniteTables distributedTblMgr;
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteImpl.class);
+
+    /**
+     * Path to the persistent storage used by the {@link org.apache.ignite.internal.vault.VaultService} component.
+     */
+    private static final Path VAULT_DB_PATH = Paths.get("vault");
+
+    /**
+     * Path for the partitions persistent storage.
+     */
+    private static final Path PARTITIONS_STORE_PATH = Paths.get("db");
 
     /** Ignite node name. */
     private final String name;
 
-    private final SqlQueryProcessor qryEngine;
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
 
     /** Configuration manager that handles node (local) configuration. */
-    private final ConfigurationManager nodeConfigurationMgr;
+    private final ConfigurationManager nodeCfgMgr;
+
+    /** Cluster service (cluster network manager). */
+    private final ClusterService clusterSvc;
+
+    /** Raft manager. */
+    private final Loza raftMgr;
+
+    /** Meta storage manager. */
+    private final MetaStorageManager metaStorageMgr;
 
     /** Configuration manager that handles cluster (distributed) configuration. */
-    private final ConfigurationManager clusterConfigurationMgr;
+    private final ConfigurationManager clusterCfgMgr;
+
+    /** Baseline manager. */
+    private final BaselineManager baselineMgr;
+
+    /** Affinity manager. */
+    private final AffinityManager affinityMgr;
+
+    /** Schema manager. */
+    private final SchemaManager schemaMgr;
+
+    /** Distributed table manager. */
+    private final TableManager distributedTblMgr;
+
+    /** Query engine. */
+    private final SqlQueryProcessor qryEngine;
+
+    /** Rest module. */
+    private final RestModule restModule;
+
+    /** Client handler module. */
+    private final ClientHandlerModule clientHandlerModule;
+
+    /** Node status. Adds ability to stop currently starting node. */
+    private final AtomicReference<Status> status = new AtomicReference<>(Status.STARTING);
 
     /**
+     * The Constructor.
+     *
      * @param name Ignite node name.
-     * @param tblMgr Table manager.
-     * @param qryEngine Query processor.
-     * @param nodeConfigurationMgr Configuration manager that handles node (local) configuration.
-     * @param clusterConfigurationMgr Configuration manager that handles cluster (distributed) configuration.
+     * @param workDir Work directory for the started node. Must not be {@code null}.
      */
     IgniteImpl(
         String name,
-        IgniteTables tblMgr,
-        SqlQueryProcessor qryEngine,
-        ConfigurationManager nodeConfigurationMgr,
-        ConfigurationManager clusterConfigurationMgr
+        Path workDir
     ) {
         this.name = name;
-        this.distributedTblMgr = tblMgr;
-        this.qryEngine = qryEngine;
-        this.nodeConfigurationMgr = nodeConfigurationMgr;
-        this.clusterConfigurationMgr = clusterConfigurationMgr;
+
+        vaultMgr = createVault(workDir);
+
+        nodeCfgMgr = new ConfigurationManager(
+            Arrays.asList(
+                NetworkConfiguration.KEY,
+                NodeConfiguration.KEY,
+                RestConfiguration.KEY,
+                ClientConnectorConfiguration.KEY
+            ),
+            Map.of(),
+            new LocalConfigurationStorage(vaultMgr)
+        );
+
+        clusterSvc = new ScaleCubeClusterServiceFactory().createClusterService(
+            new ClusterLocalConfiguration(
+                name,
+                new MessageSerializationRegistryImpl()
+            ),
+            nodeCfgMgr,
+            () -> StaticNodeFinder.fromConfiguration(nodeCfgMgr.configurationRegistry().
+                getConfiguration(NetworkConfiguration.KEY).value())
+        );
+
+        raftMgr = new Loza(clusterSvc, workDir);
+
+        metaStorageMgr = new MetaStorageManager(
+            vaultMgr,
+            nodeCfgMgr,
+            clusterSvc,
+            raftMgr
+        );
+
+        clusterCfgMgr = new ConfigurationManager(
+            Arrays.asList(
+                ClusterConfiguration.KEY,
+                TablesConfiguration.KEY
+            ),
+            Map.of(),
+            new DistributedConfigurationStorage(metaStorageMgr, vaultMgr)
+        );
+
+        baselineMgr = new BaselineManager(
+            clusterCfgMgr,
+            metaStorageMgr,
+            clusterSvc
+        );
+
+        affinityMgr = new AffinityManager(
+            clusterCfgMgr,
+            metaStorageMgr,
+            baselineMgr
+        );
+
+        schemaMgr = new SchemaManager(
+            clusterCfgMgr,
+            metaStorageMgr,
+            vaultMgr
+        );
+
+        distributedTblMgr = new TableManager(
+            nodeCfgMgr,
+            clusterCfgMgr,
+            metaStorageMgr,
+            schemaMgr,
+            affinityMgr,
+            raftMgr,
+            getPartitionsStorePath(workDir)
+        );
+
+        qryEngine = new SqlQueryProcessor(
+            clusterSvc,
+            distributedTblMgr
+        );
+
+        restModule = new RestModule(nodeCfgMgr, clusterCfgMgr);
+
+        clientHandlerModule = new ClientHandlerModule(distributedTblMgr, nodeCfgMgr.configurationRegistry());
+    }
+
+    /**
+     * Starts ignite node.
+     *
+     * @param cfg Optional node configuration based on {@link org.apache.ignite.configuration.schemas.runner.NodeConfigurationSchema}
+     * and {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}. Following rules are used
+     * for applying the configuration properties:
+     * <ol>
+     * <li>Specified property overrides existing one or just applies itself if it wasn't
+     * previously specified.</li>
+     * <li>All non-specified properties either use previous value or use default one from
+     * corresponding configuration schema.</li>
+     * </ol>
+     * So that, in case of initial node start (first start ever) specified configuration, supplemented with defaults, is
+     * used. If no configuration was provided defaults are used for all configuration properties. In case of node
+     * restart, specified properties override existing ones, non specified properties that also weren't specified
+     * previously use default values. Please pay attention that previously specified properties are searched in the
+     * {@code workDir} specified by the user.
+     */
+    public void start(@Nullable String cfg) {
+        List<IgniteComponent> startedComponents = new ArrayList<>();
+
+        try {
+            // Vault startup.
+            doStartComponent(
+                name,
+                startedComponents,
+                vaultMgr
+            );
+
+            vaultMgr.putName(name).join();
+
+            // Node configuration manager startup.
+            doStartComponent(
+                name,
+                startedComponents,
+                nodeCfgMgr);
+
+            // Node configuration manager bootstrap.
+            if (cfg != null) {
+                try {
+                    nodeCfgMgr.bootstrap(cfg);
+                }
+                catch (Exception e) {
+                    LOG.warn("Unable to parse user-specific configuration, default configuration will be used: {}",
+                        e.getMessage());
+                }
+            }
+            else
+                nodeCfgMgr.configurationRegistry().initializeDefaults();
+
+            // Start the remaining components.
+            List<IgniteComponent> otherComponents = List.of(
+                clusterSvc,
+                raftMgr,
+                metaStorageMgr,
+                clusterCfgMgr,
+                baselineMgr,
+                affinityMgr,
+                schemaMgr,
+                distributedTblMgr,
+                qryEngine,
+                restModule,
+                clientHandlerModule
+            );
+
+            for (IgniteComponent component : otherComponents)
+                doStartComponent(name, startedComponents, component);
+
+            // Deploy all resisted watches cause all components are ready and have registered their listeners.

Review comment:
       ```suggestion
               // Deploy all registered watches because all components are ready and have registered their listeners.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org