You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/06/05 11:08:19 UTC
[ignite] 16/31: IGNITE-1741 Fixed hanging
CacheAffinityCallSelfTest.testAffinityCallNoServerNode - Fixes #5729.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-19225
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit e545d547d5ce883766ee2c50f499e208fa3193c5
Author: Semyon Boikov <sb...@apache.org>
AuthorDate: Tue Dec 25 11:10:17 2018 +0300
IGNITE-1741 Fixed hanging CacheAffinityCallSelfTest.testAffinityCallNoServerNode - Fixes #5729.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
(cherry picked from commit 175c1d815d848918eab79960910a8a3002143aa0)
---
.../managers/discovery/GridDiscoveryManager.java | 6 +-
.../processors/affinity/GridAffinityProcessor.java | 195 ++++++++++++---------
.../internal/processors/task/GridTaskWorker.java | 14 +-
.../cache/CacheAffinityCallSelfTest.java | 70 +++++++-
4 files changed, 193 insertions(+), 92 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 42dabdd..552fc2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1939,7 +1939,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+ public List<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName);
}
@@ -2483,7 +2483,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param cacheName Cache name.
* @param node Node to add
*/
- private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode node) {
List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
if (cacheNodes == null) {
@@ -2492,7 +2492,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
cacheMap.put(CU.cacheId(cacheName), cacheNodes);
}
- cacheNodes.add(rich);
+ cacheNodes.add(node);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 61886b6..67b511c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -30,21 +30,24 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -53,6 +56,8 @@ import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -79,12 +84,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
/** Affinity map cleanup delay (ms). */
private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000;
- /** Retries to get affinity in case of error. */
- private static final int ERROR_RETRIES = 3;
-
- /** Time to wait between errors (in milliseconds). */
- private static final long ERROR_WAIT = 500;
-
/** Log. */
private final IgniteLogger log;
@@ -390,10 +389,19 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
* @return Affinity cache.
* @throws IgniteCheckedException In case of error.
*/
- @SuppressWarnings("ErrorNotRethrown")
@Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer)
throws IgniteCheckedException {
+ return affinityCacheFuture(cacheName, topVer).get();
+ }
+ /**
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @return Affinity cache.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String cacheName, AffinityTopologyVersion topVer)
+ throws IgniteCheckedException {
assert cacheName != null;
AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer);
@@ -401,7 +409,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
if (fut != null)
- return fut.get();
+ return fut;
GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
@@ -416,7 +424,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
cctx.gate().enter();
}
catch (IllegalStateException ignored) {
- return null;
+ return new GridFinishedFuture<>((AffinityInfo)null);
}
try {
@@ -428,99 +436,116 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
cctx.cacheObjectContext()
);
- IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info));
+ GridFinishedFuture<AffinityInfo> fut0 = new GridFinishedFuture<>(info);
+
+ IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
if (old != null)
- info = old.get();
+ return old;
- return info;
+ return fut0;
}
finally {
cctx.gate().leave();
}
}
- Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer);
+ List<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer);
- if (F.isEmpty(cacheNodes))
- return null;
+ DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
- GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>();
+ if (desc == null || F.isEmpty(cacheNodes)) {
+ if (ctx.clientDisconnected())
+ return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to get affinity mapping, client disconnected."));
- IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
+ return new GridFinishedFuture<>((AffinityInfo)null);
+ }
- if (old != null)
- return old.get();
+ if (desc.cacheConfiguration().getCacheMode() == LOCAL)
+ return new GridFinishedFuture<>(new IgniteCheckedException("Failed to map keys for LOCAL cache: " + cacheName));
- int max = ERROR_RETRIES;
- int cnt = 0;
+ AffinityFuture fut0 = new AffinityFuture(cacheName, topVer, cacheNodes);
- Iterator<ClusterNode> it = cacheNodes.iterator();
+ IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
- // We are here because affinity has not been fetched yet, or cache mode is LOCAL.
- while (true) {
- cnt++;
+ if (old != null)
+ return old;
- if (!it.hasNext())
- it = cacheNodes.iterator();
+ fut0.getAffinityFromNextNode();
- // Double check since we deal with dynamic view.
- if (!it.hasNext())
- // Exception will be caught in this method.
- throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName);
+ return fut0;
+ }
- ClusterNode n = it.next();
+ /**
+ *
+ */
+ private class AffinityFuture extends GridFutureAdapter<AffinityInfo> {
+ /** */
+ private final String cacheName;
- CacheMode mode = ctx.cache().cacheMode(cacheName);
+ /** */
+ private final AffinityTopologyVersion topVer;
- if (mode == null) {
- if (ctx.clientDisconnected())
- throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
- "Failed to get affinity mapping, client disconnected.");
+ /** */
+ private final List<ClusterNode> cacheNodes;
- throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName);
- }
+ /** */
+ private int nodeIdx;
- // Map all keys to a single node, if the cache mode is LOCAL.
- if (mode == LOCAL) {
- fut0.onDone(new IgniteCheckedException("Failed to map keys for LOCAL cache."));
+ /**
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @param cacheNodes Cache nodes.
+ */
+ AffinityFuture(String cacheName, AffinityTopologyVersion topVer, List<ClusterNode> cacheNodes) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ this.cacheNodes = cacheNodes;
+ }
- // Will throw exception.
- fut0.get();
- }
+ /**
+ *
+ */
+ void getAffinityFromNextNode() {
+ while (nodeIdx < cacheNodes.size()) {
+ final ClusterNode node = cacheNodes.get(nodeIdx);
- try {
- // Resolve cache context for remote node.
- // Set affinity function before counting down on latch.
- fut0.onDone(affinityInfoFromNode(cacheName, topVer, n));
+ nodeIdx++;
- break;
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get affinity from node (will retry) [cache=" + cacheName +
- ", node=" + U.toShortString(n) + ", msg=" + e.getMessage() + ']');
+ if (!ctx.discovery().alive(node.id()))
+ continue;
- if (cnt < max) {
- U.sleep(ERROR_WAIT);
+ affinityInfoFromNode(cacheName, topVer, node).listen(new CI1<IgniteInternalFuture<AffinityInfo>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityInfo> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException || X.hasCause(e, ClusterTopologyException.class)) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get affinity from node, node failed [cache=" + cacheName +
+ ", node=" + node.id() + ", msg=" + e.getMessage() + ']');
- continue;
- }
+ getAffinityFromNextNode();
+
+ return;
+ }
- affMap.remove(key, fut0);
+ if (log.isDebugEnabled())
+ log.debug("Failed to get affinity from node [cache=" + cacheName +
+ ", node=" + node.id() + ", msg=" + e.getMessage() + ']');
- fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e));
+ onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e));
+ }
+ }
+ });
- break;
+ return;
}
- catch (RuntimeException | Error e) {
- fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e));
- break;
- }
+ onDone(new ClusterGroupEmptyCheckedException("Failed to get cache affinity, all cache nodes failed: " + cacheName));
}
-
- return fut0.get();
}
/**
@@ -529,26 +554,30 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
* @param cacheName Name of cache on which affinity is requested.
* @param topVer Topology version.
* @param n Node from which affinity is requested.
- * @return Affinity cached function.
- * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects.
+ * @return Affinity future.
*/
- private AffinityInfo affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n)
- throws IgniteCheckedException {
- GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure()
- .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false).get();
+ private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n) {
+ IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure()
+ .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false);
+
+ return fut.chain(new CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>>, AffinityInfo>() {
+ @Override public AffinityInfo applyx(IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut) throws IgniteCheckedException {
+ GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = fut.get();
- AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1());
- AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
+ AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1());
+ AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
- assert m != null;
+ assert m != null;
- // Bring to initial state.
- f.reset();
- m.reset();
+ // Bring to initial state.
+ f.reset();
+ m.reset();
- CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName);
+ CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName);
- return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg));
+ return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg));
+ }
+ });
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 02e8736..78d1554 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -928,9 +928,21 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
affFut = ctx.cache().context().exchange().lastTopologyFuture();
+
+ if (affFut == null || affFut.isDone()) {
+ affFut = null;
+
+ // Need asynchronosly fetch affinity if cache is not started on node .
+ if (affCacheName != null && ctx.cache().internalCache(affCacheName) == null) {
+ affFut = ctx.affinity().affinityCacheFuture(affCacheName, mapTopVer);
+
+ if (affFut.isDone())
+ affFut = null;
+ }
+ }
}
- if (affFut != null && !affFut.isDone()) {
+ if (affFut != null) {
waitForAffTop = true;
jobRes.resetResponse();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index 2c5472e..3eb6974 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -17,13 +17,18 @@
package org.apache.ignite.internal.processors.cache;
import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -142,18 +147,23 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
*/
@Test
public void testAffinityCallNoServerNode() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1741");
-
startGridsMultiThreaded(SRVS + 1);
final Integer key = 1;
- final Ignite client = grid(SRVS);
+ final IgniteEx client = grid(SRVS);
assertTrue(client.configuration().isClientMode());
+ assertNull(client.context().cache().cache(CACHE_NAME));
+
+ final int THREADS = 5;
+
+ CyclicBarrier b = new CyclicBarrier(THREADS + 1);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
+ b.await();
+
for (int i = 0; i < SRVS; ++i)
stopGrid(i, false);
@@ -162,8 +172,16 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
});
try {
- while (!fut.isDone())
- client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null));
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Void call() throws Exception {
+ b.await();
+
+ while (!fut.isDone())
+ client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null));
+
+ return null;
+ }
+ }, THREADS, "test-thread");
}
catch (ClusterTopologyException e) {
log.info("Expected error: " + e);
@@ -174,6 +192,48 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAffinityFailoverNoCacheOnClient() throws Exception {
+ startGridsMultiThreaded(SRVS + 1);
+
+ final Integer key = 1;
+
+ final IgniteEx client = grid(SRVS);
+
+ assertTrue(client.configuration().isClientMode());
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < SRVS - 1; ++i) {
+ U.sleep(ThreadLocalRandom.current().nextLong(100) + 50);
+
+ stopGrid(i, false);
+ }
+
+ return null;
+ }
+ });
+
+ try {
+ final Affinity<Integer> aff = client.affinity(CACHE_NAME);
+
+ assertNull(client.context().cache().cache(CACHE_NAME));
+
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ while (!fut.isDone())
+ assertNotNull(aff.mapKeyToNode(key));
+ }
+ }, 5, "test-thread");
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
* Test callable.
*/
public static class CheckCallable implements IgniteCallable<Object> {