You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2020/06/01 07:59:11 UTC

[ignite] branch master updated: IGNITE-13050 Added nodes cache in ClusterGroupAdapter - Fixes #7831.

This is an automated email from the ASF dual-hosted git repository.

ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b2089a6  IGNITE-13050 Added nodes cache in ClusterGroupAdapter - Fixes #7831.
b2089a6 is described below

commit b2089a6be789fbc8e4fc86e4125872e6456bb3d0
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Jun 1 10:50:32 2020 +0300

    IGNITE-13050 Added nodes cache in ClusterGroupAdapter - Fixes #7831.
    
    Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
 .../internal/cluster/ClusterGroupAdapter.java      | 220 +++++++++++++--------
 .../ignite/internal/ClusterGroupSelfTest.java      | 118 +++++++----
 2 files changed, 215 insertions(+), 123 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 3d4b71f..4cea339 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -61,6 +61,9 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.unmodifiableCollection;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 
 /**
@@ -97,6 +100,9 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
     /** Node IDs. */
     private Set<UUID> ids;
 
+    /** */
+    private transient volatile ClusterGroupState state;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -285,43 +291,45 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
 
     /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes() {
-        guard();
+        return unmodifiableCollection(ensureLastTopologyState().nodes);
+    }
 
-        try {
-            if (ids != null) {
-                if (ids.isEmpty())
-                    return Collections.emptyList();
-                else if (ids.size() == 1) {
-                    ClusterNode node = ctx.discovery().node(F.first(ids));
+    /** */
+    protected Collection<ClusterNode> resolveCurrentNodes() {
+        assert Thread.holdsLock(this);
 
-                    return node != null ? Collections.singleton(node) : Collections.<ClusterNode>emptyList();
-                }
-                else {
-                    Collection<ClusterNode> nodes = new ArrayList<>(ids.size());
+        if (ids != null) {
+            if (ids.isEmpty())
+                return Collections.emptyList();
+            else if (ids.size() == 1) {
+                ClusterNode node = ctx.discovery().node(F.first(ids));
 
-                    for (UUID id : ids) {
-                        ClusterNode node = ctx.discovery().node(id);
+                return node != null ? singleton(node) : emptySet();
+            }
+            else {
+                ArrayList<ClusterNode> nodes = new ArrayList<>(ids.size());
 
-                        if (node != null)
-                            nodes.add(node);
-                    }
+                for (UUID id : ids) {
+                    ClusterNode node = ctx.discovery().node(id);
 
-                    return nodes;
+                    if (node != null)
+                        nodes.add(node);
                 }
-            }
-            else {
-                Collection<ClusterNode> all;
 
-                if (p instanceof DaemonFilter)
-                    all = F.concat(false, ctx.discovery().daemonNodes(), ctx.discovery().allNodes());
-                else
-                    all = ctx.discovery().allNodes();
+                nodes.trimToSize();
 
-                return p != null ? F.view(all, p) : all;
+                return nodes;
             }
         }
-        finally {
-            unguard();
+        else {
+            Collection<ClusterNode> all;
+
+            if (p instanceof DaemonFilter)
+                all = F.concat(false, ctx.discovery().daemonNodes(), ctx.discovery().allNodes());
+            else
+                all = ctx.discovery().allNodes();
+
+            return p != null ? F.view(all, p) : all;
         }
     }
 
@@ -412,7 +420,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
             Set<UUID> nodeIds;
 
             if (F.isEmpty(nodes))
-                nodeIds = contains(node) ? Collections.singleton(node.id()) : Collections.<UUID>emptySet();
+                nodeIds = contains(node) ? singleton(node.id()) : emptySet();
             else {
                 nodeIds = U.newHashSet(nodes.length + 1);
 
@@ -461,7 +469,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
             Set<UUID> nodeIds;
 
             if (F.isEmpty(ids))
-                nodeIds = contains(id) ? Collections.singleton(id) : Collections.<UUID>emptySet();
+                nodeIds = contains(id) ? singleton(id) : emptySet();
             else {
                 nodeIds = U.newHashSet(ids.length + 1);
 
@@ -538,7 +546,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
 
     /** {@inheritDoc} */
     @Override public final ClusterGroup forRemotes() {
-        return forOthers(Collections.singleton(ctx.localNodeId()));
+        return forOthers(singleton(ctx.localNodeId()));
     }
 
     /**
@@ -709,6 +717,46 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
         }
     }
 
+    /** */
+    protected final ClusterGroupState ensureLastTopologyState() {
+        ClusterGroupState state = this.state;
+
+        GridDiscoveryManager discoMgr = ctx.discovery();
+
+        long lastTopVer = discoMgr.topologyVersion();
+        long startTime = discoMgr.gridStartTime();
+
+        if (state == null || state.lastTopVer < lastTopVer || state.startTime != startTime)
+            return resetState();
+
+        return state;
+    }
+
+    /** */
+    protected synchronized ClusterGroupState resetState() {
+        guard();
+
+        try {
+            ClusterGroupState state = this.state;
+
+            GridDiscoveryManager discoMgr = ctx.discovery();
+
+            long lastTopVer = discoMgr.topologyVersion();
+            long startTime = discoMgr.gridStartTime();
+
+            // Double check in synchronized context.
+            if (state != null && state.lastTopVer == lastTopVer && state.startTime == startTime)
+                return state;
+
+            Collection<ClusterNode> nodes = resolveCurrentNodes();
+
+            return this.state = new ClusterGroupState(nodes, lastTopVer, startTime);
+        }
+        finally {
+            unguard();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, igniteInstanceName);
@@ -752,6 +800,37 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
     }
 
     /**
+     * Container for cluster group state.
+     */
+    private static class ClusterGroupState {
+        /** Calculated nodes. */
+        public final Collection<ClusterNode> nodes;
+
+        /** Last topology version. */
+        public final long lastTopVer;
+
+        /**
+         * Start time of first node in grid. Required for cases like in
+         * {@code GridServiceProxyClientReconnectSelfTest#testClientReconnect()} test. In that scenario we have one
+         * server and one client. Topology version is {@code 2} and after server restart and client reconnect we have
+         * basically new server but with the same topology version. This situation can be caught if we have additional
+         * counter.
+         */
+        public final long startTime;
+
+        /**
+         * @param nodes Calculated nodes.
+         * @param lastTopVer Last topology version.
+         * @param startTime Start time of first node in grid.
+         */
+        public ClusterGroupState(Collection<ClusterNode> nodes, long lastTopVer, long startTime) {
+            this.nodes = nodes;
+            this.lastTopVer = lastTopVer;
+            this.startTime = startTime;
+        }
+    }
+
+    /**
      */
     private static class CachesFilter implements IgnitePredicate<ClusterNode> {
         /** */
@@ -907,7 +986,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
         private boolean isOldest;
 
         /** State. */
-        private volatile AgeClusterGroupState state;
+        private volatile IgnitePredicate<ClusterNode> ageP;
 
         /**
          * Required for {@link Externalizable}.
@@ -924,54 +1003,48 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
             super(parent.ctx, parent.subjId, parent.p, parent.ids);
 
             this.isOldest = isOldest;
-
-            reset();
         }
 
-        /**
-         * Resets node.
-         */
-        private synchronized void reset() {
-            guard();
-
-            try {
-                long lastTopVer = ctx.discovery().topologyVersion();
+        /** {@inheritDoc} */
+        @Override protected Set<ClusterNode> resolveCurrentNodes() {
+            Collection<ClusterNode> nodes = super.resolveCurrentNodes();
 
-                ClusterNode node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null);
+            ClusterNode node = isOldest ? U.oldest(nodes, null) : U.youngest(nodes, null);
 
-                IgnitePredicate<ClusterNode> p = F.nodeForNodes(node);
+            if (node == null) {
+                ageP = F.alwaysFalse();
 
-                state = new AgeClusterGroupState(node, p, lastTopVer);
-            }
-            finally {
-                unguard();
+                return emptySet();
             }
-        }
-
-        /** {@inheritDoc} */
-        @Override public ClusterNode node() {
-            if (ctx.discovery().topologyVersion() != state.lastTopVer)
-                reset();
+            else {
+                ageP = F.nodeForNodes(node);
 
-            return state.node;
+                return singleton(node);
+            }
         }
 
         /** {@inheritDoc} */
         @Override public Collection<ClusterNode> nodes() {
-            if (ctx.discovery().topologyVersion() != state.lastTopVer)
-                reset();
+            guard();
+
+            try {
+                ClusterNode node = F.first(ensureLastTopologyState().nodes);
 
-            ClusterNode node = state.node;
+                if (node != null)
+                    node = ctx.discovery().node(node.id());
 
-            return node == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(node);
+                return node == null ? emptySet() : singleton(node);
+            }
+            finally {
+                unguard();
+            }
         }
 
         /** {@inheritDoc} */
         @Override public IgnitePredicate<ClusterNode> predicate() {
-            if (ctx.discovery().topologyVersion() != state.lastTopVer)
-                reset();
+            ensureLastTopologyState();
 
-            return state.p;
+            return ageP;
         }
 
         /** {@inheritDoc} */
@@ -1022,31 +1095,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
     }
 
     /**
-     * Container for age-based cluster group state.
-     */
-    private static class AgeClusterGroupState {
-        /** Selected node. */
-        private final ClusterNode node;
-
-        /** Node predicate. */
-        private final IgnitePredicate<ClusterNode> p;
-
-        /** Last topology version. */
-        private final long lastTopVer;
-
-        /**
-         * @param node Node.
-         * @param p Predicate.
-         * @param lastTopVer Last topology version.
-         */
-        public AgeClusterGroupState(ClusterNode node, IgnitePredicate<ClusterNode> p, long lastTopVer) {
-            this.node = node;
-            this.p = p;
-            this.lastTopVer = lastTopVer;
-        }
-    }
-
-    /**
      * Dynamic cluster group based predicate.
      */
     private static class GroupPredicate implements IgnitePredicate<ClusterNode> {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
index 534cc3a..bfb6453 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
@@ -17,19 +17,21 @@
 
 package org.apache.ignite.internal;
 
-import java.util.Collection;
-import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCluster;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.Marshaller;
@@ -37,6 +39,8 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
+import static java.util.Collections.singleton;
+
 /**
  * Test for {@link ClusterGroup}.
  */
@@ -46,7 +50,7 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
     private static final int NODES_CNT = 4;
 
     /** Projection node IDs. */
-    private static Collection<UUID> ids;
+    private static List<UUID> ids;
 
     /** */
     private static Ignite ignite;
@@ -55,20 +59,11 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         assert NODES_CNT > 2;
 
-        ids = new LinkedList<>();
-
         for (int i = 0; i < NODES_CNT; i++) {
-            Ignite g;
-
             if (i > 1)
-                g = startClientGrid(i);
+                startClientGrid(i);
             else
-                g = startGrid(i);
-
-            ids.add(g.cluster().localNode().id());
-
-            if (i == 0)
-                ignite = g;
+                startGrid(i);
         }
 
         waitForTopology(NODES_CNT);
@@ -82,6 +77,19 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        ignite = grid(0);
+
+        ids = G.allGrids().stream()
+            .map(Ignite::cluster)
+            .map(IgniteCluster::localNode)
+            .map(ClusterNode::id)
+            .collect(Collectors.toList());
+    }
+
+    /** {@inheritDoc} */
     @Override protected ClusterGroup projection() {
         return grid(0).cluster().forPredicate(F.nodeForNodeIds(ids));
     }
@@ -100,27 +108,39 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
     }
 
     /**
-     * @throws Exception If failed.
      */
     @Test
     public void testOldest() throws Exception {
-        ClusterGroup oldest = ignite.cluster().forOldest();
+        IgniteCluster cluster = grid(1).cluster();
 
-        ClusterNode node = null;
+        ClusterGroup oldest = cluster.forOldest();
 
-        long minOrder = Long.MAX_VALUE;
+        ClusterNode oldestNode = grid(0).localNode();
 
-        for (ClusterNode n : ignite.cluster().nodes()) {
-            if (n.order() < minOrder) {
-                node = n;
+        assertEquals(cluster.forNode(oldestNode).node(), oldest.node());
 
-                minOrder = n.order();
-            }
-        }
+        assertEqualsCollections(
+            singleton(cluster.forNode(oldestNode).node()),
+            cluster.nodes().stream().filter(oldest.predicate()::apply).collect(Collectors.toSet())
+        );
 
-        assertEquals(oldest.node(), ignite.cluster().forNode(node).node());
+        stopGrid(0);
 
-        ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val");
+        try {
+            ClusterNode newOldestNode = grid(1).localNode();
+
+            assertEquals(cluster.forNode(newOldestNode).node(), oldest.node());
+
+            assertEqualsCollections(
+                singleton(cluster.forNode(newOldestNode).node()),
+                cluster.nodes().stream().filter(oldest.predicate()::apply).collect(Collectors.toSet())
+            );
+        }
+        finally {
+            startGrid(0);
+        }
+
+        ClusterGroup emptyGrp = cluster.forAttribute("nonExistent", "val");
 
         assertEquals(0, emptyGrp.forOldest().nodes().size());
     }
@@ -130,25 +150,49 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
      */
     @Test
     public void testYoungest() throws Exception {
-        ClusterGroup youngest = ignite.cluster().forYoungest();
+        IgniteCluster cluster = ignite.cluster();
 
-        ClusterNode node = null;
+        ClusterGroup youngest = cluster.forYoungest();
 
-        long maxOrder = Long.MIN_VALUE;
+        ClusterNode youngestNode = grid(NODES_CNT - 1).localNode();
 
-        for (ClusterNode n : ignite.cluster().nodes()) {
-            if (n.order() > maxOrder) {
-                node = n;
+        assertEquals(cluster.forNode(youngestNode).node(), youngest.node());
 
-                maxOrder = n.order();
-            }
-        }
+        assertEqualsCollections(
+            singleton(cluster.forNode(youngestNode).node()),
+            cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet())
+        );
 
-        assertEquals(youngest.node(), ignite.cluster().forNode(node).node());
+        stopGrid(NODES_CNT - 1);
 
-        ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val");
+        try {
+            ClusterNode newYoungestNode = grid(NODES_CNT - 2).localNode();
+
+            assertEquals(cluster.forNode(newYoungestNode).node(), youngest.node());
+
+            assertEqualsCollections(
+                singleton(cluster.forNode(newYoungestNode).node()),
+                cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet())
+            );
+        }
+        finally {
+            startClientGrid(NODES_CNT - 1);
+        }
+
+        ClusterGroup emptyGrp = cluster.forAttribute("nonExistent", "val");
 
         assertEquals(0, emptyGrp.forYoungest().nodes().size());
+
+        try (Ignite ignore = startGrid(NODES_CNT)) {
+            ClusterNode newYoungestNode = grid(NODES_CNT).localNode();
+
+            assertEquals(cluster.forNode(newYoungestNode).node(), youngest.node());
+
+            assertEqualsCollections(
+                singleton(cluster.forNode(newYoungestNode).node()),
+                cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet())
+            );
+        }
     }
 
     /**