You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2020/06/01 07:59:11 UTC
[ignite] branch master updated: IGNITE-13050 Added nodes cache in
ClusterGroupAdapter - Fixes #7831.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b2089a6 IGNITE-13050 Added nodes cache in ClusterGroupAdapter - Fixes #7831.
b2089a6 is described below
commit b2089a6be789fbc8e4fc86e4125872e6456bb3d0
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Jun 1 10:50:32 2020 +0300
IGNITE-13050 Added nodes cache in ClusterGroupAdapter - Fixes #7831.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
.../internal/cluster/ClusterGroupAdapter.java | 220 +++++++++++++--------
.../ignite/internal/ClusterGroupSelfTest.java | 118 +++++++----
2 files changed, 215 insertions(+), 123 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 3d4b71f..4cea339 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -61,6 +61,9 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.unmodifiableCollection;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
/**
@@ -97,6 +100,9 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** Node IDs. */
private Set<UUID> ids;
+ /** */
+ private transient volatile ClusterGroupState state;
+
/**
* Required by {@link Externalizable}.
*/
@@ -285,43 +291,45 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes() {
- guard();
+ return unmodifiableCollection(ensureLastTopologyState().nodes);
+ }
- try {
- if (ids != null) {
- if (ids.isEmpty())
- return Collections.emptyList();
- else if (ids.size() == 1) {
- ClusterNode node = ctx.discovery().node(F.first(ids));
+ /** */
+ protected Collection<ClusterNode> resolveCurrentNodes() {
+ assert Thread.holdsLock(this);
- return node != null ? Collections.singleton(node) : Collections.<ClusterNode>emptyList();
- }
- else {
- Collection<ClusterNode> nodes = new ArrayList<>(ids.size());
+ if (ids != null) {
+ if (ids.isEmpty())
+ return Collections.emptyList();
+ else if (ids.size() == 1) {
+ ClusterNode node = ctx.discovery().node(F.first(ids));
- for (UUID id : ids) {
- ClusterNode node = ctx.discovery().node(id);
+ return node != null ? singleton(node) : emptySet();
+ }
+ else {
+ ArrayList<ClusterNode> nodes = new ArrayList<>(ids.size());
- if (node != null)
- nodes.add(node);
- }
+ for (UUID id : ids) {
+ ClusterNode node = ctx.discovery().node(id);
- return nodes;
+ if (node != null)
+ nodes.add(node);
}
- }
- else {
- Collection<ClusterNode> all;
- if (p instanceof DaemonFilter)
- all = F.concat(false, ctx.discovery().daemonNodes(), ctx.discovery().allNodes());
- else
- all = ctx.discovery().allNodes();
+ nodes.trimToSize();
- return p != null ? F.view(all, p) : all;
+ return nodes;
}
}
- finally {
- unguard();
+ else {
+ Collection<ClusterNode> all;
+
+ if (p instanceof DaemonFilter)
+ all = F.concat(false, ctx.discovery().daemonNodes(), ctx.discovery().allNodes());
+ else
+ all = ctx.discovery().allNodes();
+
+ return p != null ? F.view(all, p) : all;
}
}
@@ -412,7 +420,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
Set<UUID> nodeIds;
if (F.isEmpty(nodes))
- nodeIds = contains(node) ? Collections.singleton(node.id()) : Collections.<UUID>emptySet();
+ nodeIds = contains(node) ? singleton(node.id()) : emptySet();
else {
nodeIds = U.newHashSet(nodes.length + 1);
@@ -461,7 +469,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
Set<UUID> nodeIds;
if (F.isEmpty(ids))
- nodeIds = contains(id) ? Collections.singleton(id) : Collections.<UUID>emptySet();
+ nodeIds = contains(id) ? singleton(id) : emptySet();
else {
nodeIds = U.newHashSet(ids.length + 1);
@@ -538,7 +546,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** {@inheritDoc} */
@Override public final ClusterGroup forRemotes() {
- return forOthers(Collections.singleton(ctx.localNodeId()));
+ return forOthers(singleton(ctx.localNodeId()));
}
/**
@@ -709,6 +717,46 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
}
}
+ /** */
+ protected final ClusterGroupState ensureLastTopologyState() {
+ ClusterGroupState state = this.state;
+
+ GridDiscoveryManager discoMgr = ctx.discovery();
+
+ long lastTopVer = discoMgr.topologyVersion();
+ long startTime = discoMgr.gridStartTime();
+
+ if (state == null || state.lastTopVer < lastTopVer || state.startTime != startTime)
+ return resetState();
+
+ return state;
+ }
+
+ /** */
+ protected synchronized ClusterGroupState resetState() {
+ guard();
+
+ try {
+ ClusterGroupState state = this.state;
+
+ GridDiscoveryManager discoMgr = ctx.discovery();
+
+ long lastTopVer = discoMgr.topologyVersion();
+ long startTime = discoMgr.gridStartTime();
+
+ // Double check in synchronized context.
+ if (state != null && state.lastTopVer == lastTopVer && state.startTime == startTime)
+ return state;
+
+ Collection<ClusterNode> nodes = resolveCurrentNodes();
+
+ return this.state = new ClusterGroupState(nodes, lastTopVer, startTime);
+ }
+ finally {
+ unguard();
+ }
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, igniteInstanceName);
@@ -752,6 +800,37 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
}
/**
+ * Container for cluster group state.
+ */
+ private static class ClusterGroupState {
+ /** Calculated nodes. */
+ public final Collection<ClusterNode> nodes;
+
+ /** Last topology version. */
+ public final long lastTopVer;
+
+ /**
+ * Start time of first node in grid. Required for cases like in
+ * {@code GridServiceProxyClientReconnectSelfTest#testClientReconnect()} test. In that scenario we have one
+ * server and one client. Topology version is {@code 2} and after server restart and client reconnect we have
+ * basically new server but with the same topology version. This situation can be caught if we have additional
+ * counter.
+ */
+ public final long startTime;
+
+ /**
+ * @param nodes Calculated nodes.
+ * @param lastTopVer Last topology version.
+ * @param startTime Start time of first node in grid.
+ */
+ public ClusterGroupState(Collection<ClusterNode> nodes, long lastTopVer, long startTime) {
+ this.nodes = nodes;
+ this.lastTopVer = lastTopVer;
+ this.startTime = startTime;
+ }
+ }
+
+ /**
*/
private static class CachesFilter implements IgnitePredicate<ClusterNode> {
/** */
@@ -907,7 +986,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
private boolean isOldest;
/** State. */
- private volatile AgeClusterGroupState state;
+ private volatile IgnitePredicate<ClusterNode> ageP;
/**
* Required for {@link Externalizable}.
@@ -924,54 +1003,48 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
super(parent.ctx, parent.subjId, parent.p, parent.ids);
this.isOldest = isOldest;
-
- reset();
}
- /**
- * Resets node.
- */
- private synchronized void reset() {
- guard();
-
- try {
- long lastTopVer = ctx.discovery().topologyVersion();
+ /** {@inheritDoc} */
+ @Override protected Set<ClusterNode> resolveCurrentNodes() {
+ Collection<ClusterNode> nodes = super.resolveCurrentNodes();
- ClusterNode node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null);
+ ClusterNode node = isOldest ? U.oldest(nodes, null) : U.youngest(nodes, null);
- IgnitePredicate<ClusterNode> p = F.nodeForNodes(node);
+ if (node == null) {
+ ageP = F.alwaysFalse();
- state = new AgeClusterGroupState(node, p, lastTopVer);
- }
- finally {
- unguard();
+ return emptySet();
}
- }
-
- /** {@inheritDoc} */
- @Override public ClusterNode node() {
- if (ctx.discovery().topologyVersion() != state.lastTopVer)
- reset();
+ else {
+ ageP = F.nodeForNodes(node);
- return state.node;
+ return singleton(node);
+ }
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes() {
- if (ctx.discovery().topologyVersion() != state.lastTopVer)
- reset();
+ guard();
+
+ try {
+ ClusterNode node = F.first(ensureLastTopologyState().nodes);
- ClusterNode node = state.node;
+ if (node != null)
+ node = ctx.discovery().node(node.id());
- return node == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(node);
+ return node == null ? emptySet() : singleton(node);
+ }
+ finally {
+ unguard();
+ }
}
/** {@inheritDoc} */
@Override public IgnitePredicate<ClusterNode> predicate() {
- if (ctx.discovery().topologyVersion() != state.lastTopVer)
- reset();
+ ensureLastTopologyState();
- return state.p;
+ return ageP;
}
/** {@inheritDoc} */
@@ -1022,31 +1095,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
}
/**
- * Container for age-based cluster group state.
- */
- private static class AgeClusterGroupState {
- /** Selected node. */
- private final ClusterNode node;
-
- /** Node predicate. */
- private final IgnitePredicate<ClusterNode> p;
-
- /** Last topology version. */
- private final long lastTopVer;
-
- /**
- * @param node Node.
- * @param p Predicate.
- * @param lastTopVer Last topology version.
- */
- public AgeClusterGroupState(ClusterNode node, IgnitePredicate<ClusterNode> p, long lastTopVer) {
- this.node = node;
- this.p = p;
- this.lastTopVer = lastTopVer;
- }
- }
-
- /**
* Dynamic cluster group based predicate.
*/
private static class GroupPredicate implements IgnitePredicate<ClusterNode> {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
index 534cc3a..bfb6453 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
@@ -17,19 +17,21 @@
package org.apache.ignite.internal;
-import java.util.Collection;
-import java.util.LinkedList;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCluster;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
@@ -37,6 +39,8 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
+import static java.util.Collections.singleton;
+
/**
* Test for {@link ClusterGroup}.
*/
@@ -46,7 +50,7 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
private static final int NODES_CNT = 4;
/** Projection node IDs. */
- private static Collection<UUID> ids;
+ private static List<UUID> ids;
/** */
private static Ignite ignite;
@@ -55,20 +59,11 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
assert NODES_CNT > 2;
- ids = new LinkedList<>();
-
for (int i = 0; i < NODES_CNT; i++) {
- Ignite g;
-
if (i > 1)
- g = startClientGrid(i);
+ startClientGrid(i);
else
- g = startGrid(i);
-
- ids.add(g.cluster().localNode().id());
-
- if (i == 0)
- ignite = g;
+ startGrid(i);
}
waitForTopology(NODES_CNT);
@@ -82,6 +77,19 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
}
/** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ ignite = grid(0);
+
+ ids = G.allGrids().stream()
+ .map(Ignite::cluster)
+ .map(IgniteCluster::localNode)
+ .map(ClusterNode::id)
+ .collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
@Override protected ClusterGroup projection() {
return grid(0).cluster().forPredicate(F.nodeForNodeIds(ids));
}
@@ -100,27 +108,39 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
}
/**
- * @throws Exception If failed.
*/
@Test
public void testOldest() throws Exception {
- ClusterGroup oldest = ignite.cluster().forOldest();
+ IgniteCluster cluster = grid(1).cluster();
- ClusterNode node = null;
+ ClusterGroup oldest = cluster.forOldest();
- long minOrder = Long.MAX_VALUE;
+ ClusterNode oldestNode = grid(0).localNode();
- for (ClusterNode n : ignite.cluster().nodes()) {
- if (n.order() < minOrder) {
- node = n;
+ assertEquals(cluster.forNode(oldestNode).node(), oldest.node());
- minOrder = n.order();
- }
- }
+ assertEqualsCollections(
+ singleton(cluster.forNode(oldestNode).node()),
+ cluster.nodes().stream().filter(oldest.predicate()::apply).collect(Collectors.toSet())
+ );
- assertEquals(oldest.node(), ignite.cluster().forNode(node).node());
+ stopGrid(0);
- ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val");
+ try {
+ ClusterNode newOldestNode = grid(1).localNode();
+
+ assertEquals(cluster.forNode(newOldestNode).node(), oldest.node());
+
+ assertEqualsCollections(
+ singleton(cluster.forNode(newOldestNode).node()),
+ cluster.nodes().stream().filter(oldest.predicate()::apply).collect(Collectors.toSet())
+ );
+ }
+ finally {
+ startGrid(0);
+ }
+
+ ClusterGroup emptyGrp = cluster.forAttribute("nonExistent", "val");
assertEquals(0, emptyGrp.forOldest().nodes().size());
}
@@ -130,25 +150,49 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
*/
@Test
public void testYoungest() throws Exception {
- ClusterGroup youngest = ignite.cluster().forYoungest();
+ IgniteCluster cluster = ignite.cluster();
- ClusterNode node = null;
+ ClusterGroup youngest = cluster.forYoungest();
- long maxOrder = Long.MIN_VALUE;
+ ClusterNode youngestNode = grid(NODES_CNT - 1).localNode();
- for (ClusterNode n : ignite.cluster().nodes()) {
- if (n.order() > maxOrder) {
- node = n;
+ assertEquals(cluster.forNode(youngestNode).node(), youngest.node());
- maxOrder = n.order();
- }
- }
+ assertEqualsCollections(
+ singleton(cluster.forNode(youngestNode).node()),
+ cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet())
+ );
- assertEquals(youngest.node(), ignite.cluster().forNode(node).node());
+ stopGrid(NODES_CNT - 1);
- ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val");
+ try {
+ ClusterNode newYoungestNode = grid(NODES_CNT - 2).localNode();
+
+ assertEquals(cluster.forNode(newYoungestNode).node(), youngest.node());
+
+ assertEqualsCollections(
+ singleton(cluster.forNode(newYoungestNode).node()),
+ cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet())
+ );
+ }
+ finally {
+ startClientGrid(NODES_CNT - 1);
+ }
+
+ ClusterGroup emptyGrp = cluster.forAttribute("nonExistent", "val");
assertEquals(0, emptyGrp.forYoungest().nodes().size());
+
+ try (Ignite ignore = startGrid(NODES_CNT)) {
+ ClusterNode newYoungestNode = grid(NODES_CNT).localNode();
+
+ assertEquals(cluster.forNode(newYoungestNode).node(), youngest.node());
+
+ assertEqualsCollections(
+ singleton(cluster.forNode(newYoungestNode).node()),
+ cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet())
+ );
+ }
}
/**