You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/11/24 08:30:53 UTC

[GitHub] [ignite] ibessonov commented on a change in pull request #9527: IGNITE-15733 Eventually failure of baseline registration

ibessonov commented on a change in pull request #9527:
URL: https://github.com/apache/ignite/pull/9527#discussion_r755086914



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -283,10 +290,15 @@ public boolean diagnosticEnabled() {
                 try {
                     ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), cluster.tag());
 
-                    if (log.isInfoEnabled())
-                        log.info("Writing cluster ID and tag to metastorage on ready for write " + idAndTag);
+                    if (idAndTag.id() != null) {
+                        metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
 
-                    metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+                        if (log.isInfoEnabled())
+                            log.info("Writing cluster ID and tag to metastorage on ready for write " + idAndTag);
+                    }
+                    else

Review comment:
       Please add braces

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -201,6 +213,9 @@ public void cancel(boolean halt) throws InterruptedException {
             try {
                 RunnableFuture<?> curTask = updateQueue.take();
 
+                if (writeCondition != null && writeCondition.test(""))

Review comment:
       You forgot to remove this part I think. Or, if it is correct, then please document it.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -329,10 +339,51 @@ public void updateTag(String newTag) throws IgniteCheckedException {
      * </ul>
      */
     public void onLocalJoin() {
-        cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+        Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        @Nullable Collection<BaselineNode> bltNodes = cluster.currentBaselineTopology();
+
+        if (F.isEmpty(bltNodes)) {
+            log.info("Baseline node collection is empty.");
+
+            return;
+        }
+
+        @Nullable UUID first = null;
+
+        Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+        for (ClusterNode node : rmtNodes0) {
+            if (F.contains(srvIds, node.consistentId())) {
+                first = node.id();
+
+                break;
+            }
+        }
+
+        ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+        if (first == locNode.id() || locClusterId != null) {
+            cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+
+            cluster.setTag(locClusterTag != null ? locClusterTag :
+                ClusterTagGenerator.generateTag());
 
-        cluster.setTag(locClusterTag != null ? locClusterTag :
-            ClusterTagGenerator.generateTag());
+            ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), cluster.tag());
+
+            if (log.isInfoEnabled())
+                log.info("Writing cluster ID and tag to metastorage on ready for write " + idAndTag);
+
+            try {
+                metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+            }
+            catch (IgniteCheckedException e) {
+                ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

Review comment:
       > What if this node actually fails?
   
   I'm still waiting for the reply btw.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -329,10 +339,51 @@ public void updateTag(String newTag) throws IgniteCheckedException {
      * </ul>
      */
     public void onLocalJoin() {
-        cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+        Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        @Nullable Collection<BaselineNode> bltNodes = cluster.currentBaselineTopology();
+
+        if (F.isEmpty(bltNodes)) {
+            log.info("Baseline node collection is empty.");
+
+            return;
+        }
+
+        @Nullable UUID first = null;
+
+        Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+        for (ClusterNode node : rmtNodes0) {
+            if (F.contains(srvIds, node.consistentId())) {
+                first = node.id();
+
+                break;
+            }
+        }
+
+        ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+        if (first == locNode.id() || locClusterId != null) {
+            cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+
+            cluster.setTag(locClusterTag != null ? locClusterTag :
+                ClusterTagGenerator.generateTag());
 
-        cluster.setTag(locClusterTag != null ? locClusterTag :
-            ClusterTagGenerator.generateTag());
+            ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), cluster.tag());
+
+            if (log.isInfoEnabled())
+                log.info("Writing cluster ID and tag to metastorage on ready for write " + idAndTag);
+
+            try {
+                metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+            }
+            catch (IgniteCheckedException e) {
+                ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

Review comment:
       I don't get it, CAS is not that different from regular write. What do you mean?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -328,11 +340,67 @@ public void updateTag(String newTag) throws IgniteCheckedException {
      *     when it becomes ready for read.</li>
      * </ul>
      */
+    public void onChangeState() {
+        Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        @Nullable Collection<BaselineNode> bltNodes = cluster.currentBaselineTopology();
+
+        if (F.isEmpty(bltNodes)) {
+            log.info("Baseline node collection is empty.");
+
+            return;
+        }
+
+        @Nullable UUID first = null;
+
+        Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+        for (ClusterNode node : rmtNodes0) {
+            if (F.contains(srvIds, node.consistentId())) {
+                first = node.id();
+
+                break;
+            }
+        }
+
+        ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+        if (first == locNode.id() || locClusterTag != null) {
+            final UUID id = locClusterId != null ? locClusterId : UUID.randomUUID();
+
+            final String tag = locClusterTag != null ? locClusterTag : ClusterTagGenerator.generateTag();
+
+            ClusterIdAndTag idAndTag = new ClusterIdAndTag(id, tag);
+
+            if (log.isInfoEnabled() && locClusterTag == null)
+                log.info("Writing cluster ID and tag to metastorage on ready for write " + idAndTag);
+
+            try {
+                GridFutureAdapter<?> f = metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+
+                f.listen(o -> {
+                        cluster.setId(id);
+                        cluster.setTag(tag);
+                    });
+            }
+            catch (IgniteCheckedException e) {
+                ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
+        }
+    }
+
+    /** For pure in mem cluster we still need to generate tag and id on first registred cluster node. */

Review comment:
       ```suggestion
       /** For pure in mem cluster we still need to generate tag and id on first registered cluster node. */
   ```

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -283,10 +290,15 @@ public boolean diagnosticEnabled() {
                 try {
                     ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), cluster.tag());
 
-                    if (log.isInfoEnabled())
-                        log.info("Writing cluster ID and tag to metastorage on ready for write " + idAndTag);
+                    if (idAndTag.id() != null) {
+                        metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);

Review comment:
       I think that this code deserves a comment. Why are we sure that tag&id don't exist in cluster and we can "write" instead of "cas"?

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
##########
@@ -73,19 +96,174 @@
     public void testRestart() throws Exception {
         IgniteEx ignite = startGrid(0);
 
-        ignite.cluster().active(true);
+        ignite.cluster().state(ClusterState.ACTIVE);
 
         ignite.context().distributedMetastorage().write("key", "value");
 
         stopGrid(0);
 
         ignite = startGrid(0);
 
-        ignite.cluster().active(true);
+        ignite.cluster().state(ClusterState.ACTIVE);
 
         assertEquals("value", ignite.context().distributedMetastorage().read("key"));
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStoreLagOnOneNode() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        IgniteEx ignite2 = startGrid(1);
+
+        DistributedMetaStorageImpl distrMetaStore = (DistributedMetaStorageImpl)ignite.context().distributedMetastorage();
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+        assertNotNull(worker);
+
+        // check we still have no cluster tag key.
+        assertNull(distrMetaStore.read(CLUSTER_ID_TAG_KEY));
+
+        GridTestUtils.setFieldValue(worker, "writeCondition", new Predicate<String>() {
+            private volatile boolean skip;
+
+            @Override public boolean test(String s) {
+                if (s.equals(CLUSTER_ID_TAG_KEY) || skip) {
+                    skip = true;
+
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        String clusterTag = "griffon";
+
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            boolean fail = false;

Review comment:
       This can be replaces with `return true;` in `try` and `return false;` in `catch`.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
##########
@@ -73,19 +96,174 @@
     public void testRestart() throws Exception {
         IgniteEx ignite = startGrid(0);
 
-        ignite.cluster().active(true);
+        ignite.cluster().state(ClusterState.ACTIVE);
 
         ignite.context().distributedMetastorage().write("key", "value");
 
         stopGrid(0);
 
         ignite = startGrid(0);
 
-        ignite.cluster().active(true);
+        ignite.cluster().state(ClusterState.ACTIVE);
 
         assertEquals("value", ignite.context().distributedMetastorage().read("key"));
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStoreLagOnOneNode() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        IgniteEx ignite2 = startGrid(1);
+
+        DistributedMetaStorageImpl distrMetaStore = (DistributedMetaStorageImpl)ignite.context().distributedMetastorage();
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+        assertNotNull(worker);
+
+        // check we still have no cluster tag key.
+        assertNull(distrMetaStore.read(CLUSTER_ID_TAG_KEY));
+
+        GridTestUtils.setFieldValue(worker, "writeCondition", new Predicate<String>() {
+            private volatile boolean skip;
+
+            @Override public boolean test(String s) {
+                if (s.equals(CLUSTER_ID_TAG_KEY) || skip) {
+                    skip = true;
+
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        String clusterTag = "griffon";
+
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            boolean fail = false;
+
+            try {
+                ignite2.cluster().tag(clusterTag);
+            }
+            catch (IgniteCheckedException e) {
+                assertTrue(e.getMessage().contains("Cannot change tag as default"));
+
+                fail = true;
+            }
+
+            return !fail;
+        }, 10_000));
+
+        String tag0 = ignite2.cluster().tag();
+
+        String key = "some_kind_of_uniq_key_" + ThreadLocalRandom.current().nextInt();
+
+        checkStoredWithPers(metastorage(0), ignite2, key, "value");
+
+        stopAllGrids();
+
+        ignite = startGrid(0);
+
+        startGrid(1);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        assertEquals("value", ignite.context().distributedMetastorage().read(key));
+
+        assertEquals(tag0, ignite.cluster().tag());
+    }
+
+    /** Check cluster tag behaviour while one node fails. */
+    @Test
+    public void changeTagWithNodeCrash() throws Exception {
+        String clusterTag = "seamonkey";
+
+        IgniteEx ignite = startGrid(0);
+
+        IgniteEx ignite2 = startGrid(1);
+
+        Collection<ClusterNode> rmtNodes = ignite.cluster().forServers().nodes();
+
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        // Choose a node the same as in ClusterProcessor.onChangeState.
+        @Nullable UUID first = F.first(rmtNodes0).id();
+
+        assertNotNull(first);
+
+        IgniteEx activeNode = ignite.cluster().localNode().id() == first ? ignite : ignite2;
+
+        System.err.println("node to skip: " + activeNode.name());
+
+        assertEquals(activeNode.cluster().localNode().id(), first);
+
+        DistributedMetaStorageImpl distrMetaStore =
+            (DistributedMetaStorageImpl)activeNode.context().distributedMetastorage();
+
+        ClusterProcessor proc = activeNode.context().cluster();
+
+        AtomicBoolean fail = new AtomicBoolean();
+
+        DistributedMetaStorageDelegate delegate = new DistributedMetaStorageDelegate(distrMetaStore, fail);
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+        GridTestUtils.setFieldValue(proc, "metastorage", delegate);
+
+        GridTestUtils.setFieldValue(worker, "writeCondition", new Predicate<String>() {
+            @Override public boolean test(String s) {
+                if (s.equals(CLUSTER_ID_TAG_KEY) || fail.get()) {
+                    fail.set(true);
+
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        IgniteEx alive = activeNode.name().equals(ignite.name()) ? ignite2 : ignite;
+
+        alive.cluster().state(ClusterState.ACTIVE);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            boolean failed = false;

Review comment:
       Same here, `return !failed;` just looks complicated

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/cluster/IgniteClusterIdTagTest.java
##########
@@ -424,4 +444,63 @@ public void testTagChangedEventMultinodeWithRemoteFilter() throws Exception {
         assertEquals(generatedTag, oldTagFromEvent.get());
         assertEquals(CUSTOM_TAG_0, newTagFromEvent.get());
     }
+
+    /**
+     * @return {@link DistributedMetaStorage} instance for i'th node.
+     */
+    protected DistributedMetaStorage metastorage(int i) {
+        return grid(i).context().distributedMetastorage();
+    }
+
+    /** Checks that appropriate key, value are stored into local metastore. */
+    protected void checkStoredWithPers(
+        DistributedMetaStorage msToStore,
+        IgniteEx instanceToCheck,
+        String key,
+        String value
+    ) throws IgniteCheckedException {
+        assertTrue(isPersistenceEnabled);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final DistributedMetaStorageImpl distrMetaStore =
+            (DistributedMetaStorageImpl)instanceToCheck.context().distributedMetastorage();
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+        ReadWriteMetastorage metastorage = GridTestUtils.getFieldValue(worker, "metastorage");
+
+        assertNotNull(metastorage);
+
+        IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+            try {
+                latch.await();
+
+                assertTrue(waitForCondition(() -> {
+                    try {
+                        AtomicReference<String> contains = new AtomicReference<>();
+
+                        metastorage.iterate("", (k, v) -> {
+                            if (k.contains(key))
+                                contains.set(k);
+                        }, false);
+
+                        return contains.get() != null && metastorage.readRaw(contains.get()) != null;
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }, 15_000));
+            }
+            catch (IgniteInterruptedCheckedException | InterruptedException e) {
+                throw new IgniteException(e);
+            }
+        });
+
+        latch.countDown();
+
+        msToStore.write(key, value);
+
+        f.get();

Review comment:
       Please add timeout

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -328,11 +340,67 @@ public void updateTag(String newTag) throws IgniteCheckedException {
      *     when it becomes ready for read.</li>
      * </ul>
      */
+    public void onChangeState() {
+        Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        @Nullable Collection<BaselineNode> bltNodes = cluster.currentBaselineTopology();
+
+        if (F.isEmpty(bltNodes)) {
+            log.info("Baseline node collection is empty.");
+
+            return;
+        }
+
+        @Nullable UUID first = null;
+
+        Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+        for (ClusterNode node : rmtNodes0) {
+            if (F.contains(srvIds, node.consistentId())) {
+                first = node.id();

Review comment:
       A perfect opportunity to use Stream API, don't you think so? Take a look at `java.util.stream.Stream#min`, for example




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org