You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/06/15 14:20:22 UTC

[GitHub] [ignite] alamar commented on a change in pull request #7922: IGNITE-12111 Cluster ID and tag to identify cluster

alamar commented on a change in pull request #7922:
URL: https://github.com/apache/ignite/pull/7922#discussion_r440205501



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
##########
@@ -76,6 +76,9 @@
     /** Replacing TcpDiscoveryNode field with nodeId field in discovery messages. */
     TCP_DISCOVERY_MESSAGE_NODE_COMPACT_REPRESENTATION(14),
 
+    /** Support of cluster ID and tag. */

Review comment:
       Why do we need it? I thought this class was legacy.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
##########
@@ -61,6 +61,9 @@ public DistributedConfigurationProcessor(GridKernalContext ctx) {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())

Review comment:
       Why?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
##########
@@ -639,6 +645,64 @@ private boolean changeWalMode(String cacheName, boolean enabled) {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /**
+     * Not part of public API.
+     * Enables ClusterProcessor to set ID in the following cases:
+     * <ol>
+     *     <li>For the first time on node startup.</li>
+     *     <li>Set to null on client disconnect.</li>
+     *     <li>Set to some not-null value on client reconnect.</li>
+     * </ol>
+     *
+     * @param id ID to set.
+     */
+    public void setId(UUID id) {
+        this.id = id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String tag() {
+        return tag;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void tag(String tag) throws IgniteCheckedException {
+        if (tag == null || tag.isEmpty())
+            throw new IgniteCheckedException("Please provide not-null and not empty string for cluster tag");
+
+        if (tag.length() > MAX_TAG_LENGTH)
+            throw new IgniteCheckedException("Maximum tag length is exceeded, max length is " +
+                MAX_TAG_LENGTH +

Review comment:
       I'm not sure that using that much lines is required here. If it is, CG mandate the use of curly brace here: if body longer than one line.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -145,6 +224,170 @@ public boolean diagnosticEnabled() {
         return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true);
     }
 
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
+
+        GridInternalSubscriptionProcessor isp = ctx.internalSubscriptionProcessor();
+
+        isp.registerDistributedMetastorageListener(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+        ClusterIdAndTag idAndTag = readKey(metastorage, CLUSTER_ID_TAG_KEY, "Reading cluster ID and tag from metastorage failed, " +
+            "default values will be generated");
+
+        if (log.isInfoEnabled())
+            log.info("Cluster ID and tag has been read from metastorage: " + idAndTag);
+
+        if (idAndTag != null) {
+            localClusterId = idAndTag.id();
+            localClusterTag = idAndTag.tag();
+        }
+    }
+
+    /**
+     * @param metastorage Metastorage.
+     * @param key Key.
+     * @param errMsg Err message.
+     */
+    private <T extends Serializable> T readKey(ReadableDistributedMetaStorage metastorage, String key, String errMsg) {
+        try {
+            return metastorage.read(key);
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, errMsg, e);
+
+            return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+        this.metastorage = metastorage;
+
+        metastorage.listen(
+            (k) -> k.equals(CLUSTER_ID_TAG_KEY),
+            (String k, ClusterIdAndTag oldVal, ClusterIdAndTag newVal) -> {
+                if (log.isInfoEnabled())
+                    log.info(
+                        "Cluster tag will be set to new value: " +
+                            newVal != null ? newVal.tag() : "null" +
+                            ", previous value was: " +
+                            oldVal != null ? oldVal.tag() : "null");
+
+                cluster.setTag(newVal != null ? newVal.tag() : null);
+
+                if (compatibilityMode) {

Review comment:
       What's compatibility mode?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -145,6 +224,170 @@ public boolean diagnosticEnabled() {
         return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true);
     }
 
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
+
+        GridInternalSubscriptionProcessor isp = ctx.internalSubscriptionProcessor();
+
+        isp.registerDistributedMetastorageListener(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+        ClusterIdAndTag idAndTag = readKey(metastorage, CLUSTER_ID_TAG_KEY, "Reading cluster ID and tag from metastorage failed, " +
+            "default values will be generated");
+
+        if (log.isInfoEnabled())
+            log.info("Cluster ID and tag has been read from metastorage: " + idAndTag);
+
+        if (idAndTag != null) {
+            localClusterId = idAndTag.id();
+            localClusterTag = idAndTag.tag();
+        }
+    }
+
+    /**
+     * @param metastorage Metastorage.
+     * @param key Key.
+     * @param errMsg Err message.
+     */
+    private <T extends Serializable> T readKey(ReadableDistributedMetaStorage metastorage, String key, String errMsg) {
+        try {
+            return metastorage.read(key);
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, errMsg, e);
+
+            return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+        this.metastorage = metastorage;
+
+        metastorage.listen(
+            (k) -> k.equals(CLUSTER_ID_TAG_KEY),
+            (String k, ClusterIdAndTag oldVal, ClusterIdAndTag newVal) -> {
+                if (log.isInfoEnabled())
+                    log.info(
+                        "Cluster tag will be set to new value: " +
+                            newVal != null ? newVal.tag() : "null" +

Review comment:
       It would be nice to put ternaries inside (parens) since their precedence is not obvious.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -145,6 +224,170 @@ public boolean diagnosticEnabled() {
         return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true);
     }
 
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
+
+        GridInternalSubscriptionProcessor isp = ctx.internalSubscriptionProcessor();
+
+        isp.registerDistributedMetastorageListener(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+        ClusterIdAndTag idAndTag = readKey(metastorage, CLUSTER_ID_TAG_KEY, "Reading cluster ID and tag from metastorage failed, " +

Review comment:
       Line is too long considering 120 chars limit

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -145,6 +224,170 @@ public boolean diagnosticEnabled() {
         return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true);
     }
 
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
+
+        GridInternalSubscriptionProcessor isp = ctx.internalSubscriptionProcessor();
+
+        isp.registerDistributedMetastorageListener(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+        ClusterIdAndTag idAndTag = readKey(metastorage, CLUSTER_ID_TAG_KEY, "Reading cluster ID and tag from metastorage failed, " +
+            "default values will be generated");
+
+        if (log.isInfoEnabled())
+            log.info("Cluster ID and tag has been read from metastorage: " + idAndTag);
+
+        if (idAndTag != null) {
+            localClusterId = idAndTag.id();
+            localClusterTag = idAndTag.tag();
+        }
+    }
+
+    /**
+     * @param metastorage Metastorage.
+     * @param key Key.
+     * @param errMsg Err message.
+     */
+    private <T extends Serializable> T readKey(ReadableDistributedMetaStorage metastorage, String key, String errMsg) {
+        try {
+            return metastorage.read(key);
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, errMsg, e);
+
+            return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+        this.metastorage = metastorage;
+
+        metastorage.listen(
+            (k) -> k.equals(CLUSTER_ID_TAG_KEY),
+            (String k, ClusterIdAndTag oldVal, ClusterIdAndTag newVal) -> {
+                if (log.isInfoEnabled())
+                    log.info(
+                        "Cluster tag will be set to new value: " +
+                            newVal != null ? newVal.tag() : "null" +
+                            ", previous value was: " +
+                            oldVal != null ? oldVal.tag() : "null");
+
+                cluster.setTag(newVal != null ? newVal.tag() : null);
+
+                if (compatibilityMode) {
+                    // In compatibility mode ID and tag
+                    // will be stored to metastorage on coordinator instead of receiving them on join.
+                    assert oldVal == null;
+
+                    if (log.isInfoEnabled())
+                        log.info("Cluster ID will be initialized to the value: " + newVal.id());
+
+                    cluster.setId(newVal.id());
+
+                    compatibilityMode = false;
+                }
+            }
+        );
+
+        //TODO GG-21718 - implement optimization so only coordinator makes a write to metastorage.

Review comment:
       What's GG-21718?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -125,6 +137,73 @@
     /** */
     private boolean sndMetrics;
 
+    /** Cluster ID is stored in local variable before activation when it goes to distributed metastorage. */
+    private volatile UUID localClusterId;
+
+    /** Cluster tag is stored in local variable before activation when it goes to distributed metastorage. */
+    private volatile String localClusterTag;
+
+    /** */
+    private volatile DistributedMetaStorage metastorage;
+
+    /** Flag is used to detect and manage case when new node (this one) joins old cluster. */
+    private volatile boolean compatibilityMode;
+
+    /**
+     * Listener for LEFT and FAILED events intended to catch the moment when all nodes in topology support ID and tag.
+     */
+    private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+            if (!compatibilityMode)
+                return;
+
+            if (IgniteFeatures.allNodesSupports(discoCache.remoteNodes(), IgniteFeatures.CLUSTER_ID_AND_TAG)) {

Review comment:
       Feature check may be dropped, see above also.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/cluster/FullyConnectedComponentSearcherTest.java
##########
@@ -33,7 +33,7 @@
 import org.junit.runners.Parameterized;
 
 /**
- * Class to test correctness of fully-connectet component searching algorithm.
+ * Class to test correctness of fully-connected component searching algorithm.

Review comment:
       Kudos.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org