You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/06/05 11:08:19 UTC

[ignite] 16/31: IGNITE-1741 Fixed hanging CacheAffinityCallSelfTest.testAffinityCallNoServerNode - Fixes #5729.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-19225
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit e545d547d5ce883766ee2c50f499e208fa3193c5
Author: Semyon Boikov <sb...@apache.org>
AuthorDate: Tue Dec 25 11:10:17 2018 +0300

    IGNITE-1741 Fixed hanging CacheAffinityCallSelfTest.testAffinityCallNoServerNode - Fixes #5729.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
    (cherry picked from commit 175c1d815d848918eab79960910a8a3002143aa0)
---
 .../managers/discovery/GridDiscoveryManager.java   |   6 +-
 .../processors/affinity/GridAffinityProcessor.java | 195 ++++++++++++---------
 .../internal/processors/task/GridTaskWorker.java   |  14 +-
 .../cache/CacheAffinityCallSelfTest.java           |  70 +++++++-
 4 files changed, 193 insertions(+), 92 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 42dabdd..552fc2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1939,7 +1939,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param topVer Topology version.
      * @return Collection of cache nodes.
      */
-    public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+    public List<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
         return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName);
     }
 
@@ -2483,7 +2483,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param cacheName Cache name.
      * @param node Node to add
      */
-    private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+    private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode node) {
         List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
 
         if (cacheNodes == null) {
@@ -2492,7 +2492,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             cacheMap.put(CU.cacheId(cacheName), cacheNodes);
         }
 
-        cacheNodes.add(rich);
+        cacheNodes.add(node);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 61886b6..67b511c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -30,21 +30,24 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -53,6 +56,8 @@ import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -79,12 +84,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
     /** Affinity map cleanup delay (ms). */
     private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000;
 
-    /** Retries to get affinity in case of error. */
-    private static final int ERROR_RETRIES = 3;
-
-    /** Time to wait between errors (in milliseconds). */
-    private static final long ERROR_WAIT = 500;
-
     /** Log. */
     private final IgniteLogger log;
 
@@ -390,10 +389,19 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      * @return Affinity cache.
      * @throws IgniteCheckedException In case of error.
      */
-    @SuppressWarnings("ErrorNotRethrown")
     @Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer)
         throws IgniteCheckedException {
+        return affinityCacheFuture(cacheName, topVer).get();
+    }
 
+    /**
+     * @param cacheName Cache name.
+     * @param topVer Topology version.
+     * @return Affinity cache.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String cacheName, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
         assert cacheName != null;
 
         AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer);
@@ -401,7 +409,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
 
         if (fut != null)
-            return fut.get();
+            return fut;
 
         GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
 
@@ -416,7 +424,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
                 cctx.gate().enter();
             }
             catch (IllegalStateException ignored) {
-                return null;
+                return new GridFinishedFuture<>((AffinityInfo)null);
             }
 
             try {
@@ -428,99 +436,116 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
                     cctx.cacheObjectContext()
                 );
 
-                IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info));
+                GridFinishedFuture<AffinityInfo> fut0 = new GridFinishedFuture<>(info);
+
+                IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
 
                 if (old != null)
-                    info = old.get();
+                    return old;
 
-                return info;
+                return fut0;
             }
             finally {
                 cctx.gate().leave();
             }
         }
 
-        Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer);
+        List<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer);
 
-        if (F.isEmpty(cacheNodes))
-            return null;
+        DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
 
-        GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>();
+        if (desc == null || F.isEmpty(cacheNodes)) {
+            if (ctx.clientDisconnected())
+                return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                        "Failed to get affinity mapping, client disconnected."));
 
-        IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
+            return new GridFinishedFuture<>((AffinityInfo)null);
+        }
 
-        if (old != null)
-            return old.get();
+        if (desc.cacheConfiguration().getCacheMode() == LOCAL)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Failed to map keys for LOCAL cache: " + cacheName));
 
-        int max = ERROR_RETRIES;
-        int cnt = 0;
+        AffinityFuture fut0 = new AffinityFuture(cacheName, topVer, cacheNodes);
 
-        Iterator<ClusterNode> it = cacheNodes.iterator();
+        IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
 
-        // We are here because affinity has not been fetched yet, or cache mode is LOCAL.
-        while (true) {
-            cnt++;
+        if (old != null)
+            return old;
 
-            if (!it.hasNext())
-                it = cacheNodes.iterator();
+        fut0.getAffinityFromNextNode();
 
-            // Double check since we deal with dynamic view.
-            if (!it.hasNext())
-                // Exception will be caught in this method.
-                throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName);
+        return fut0;
+    }
 
-            ClusterNode n = it.next();
+    /**
+     *
+     */
+    private class AffinityFuture extends GridFutureAdapter<AffinityInfo> {
+        /** */
+        private final String cacheName;
 
-            CacheMode mode = ctx.cache().cacheMode(cacheName);
+        /** */
+        private final AffinityTopologyVersion topVer;
 
-            if (mode == null) {
-                if (ctx.clientDisconnected())
-                    throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                            "Failed to get affinity mapping, client disconnected.");
+        /** */
+        private final List<ClusterNode> cacheNodes;
 
-                throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName);
-            }
+        /** */
+        private int nodeIdx;
 
-            // Map all keys to a single node, if the cache mode is LOCAL.
-            if (mode == LOCAL) {
-                fut0.onDone(new IgniteCheckedException("Failed to map keys for LOCAL cache."));
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Topology version.
+         * @param cacheNodes Cache nodes.
+         */
+        AffinityFuture(String cacheName, AffinityTopologyVersion topVer, List<ClusterNode> cacheNodes) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+            this.cacheNodes = cacheNodes;
+        }
 
-                // Will throw exception.
-                fut0.get();
-            }
+        /**
+         *
+         */
+        void getAffinityFromNextNode() {
+            while (nodeIdx < cacheNodes.size()) {
+                final ClusterNode node = cacheNodes.get(nodeIdx);
 
-            try {
-                // Resolve cache context for remote node.
-                // Set affinity function before counting down on latch.
-                fut0.onDone(affinityInfoFromNode(cacheName, topVer, n));
+                nodeIdx++;
 
-                break;
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get affinity from node (will retry) [cache=" + cacheName +
-                        ", node=" + U.toShortString(n) + ", msg=" + e.getMessage() + ']');
+                if (!ctx.discovery().alive(node.id()))
+                    continue;
 
-                if (cnt < max) {
-                    U.sleep(ERROR_WAIT);
+                affinityInfoFromNode(cacheName, topVer, node).listen(new CI1<IgniteInternalFuture<AffinityInfo>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityInfo> fut) {
+                        try {
+                            onDone(fut.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (e instanceof ClusterTopologyCheckedException || X.hasCause(e, ClusterTopologyException.class)) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to get affinity from node, node failed [cache=" + cacheName +
+                                            ", node=" + node.id() + ", msg=" + e.getMessage() + ']');
 
-                    continue;
-                }
+                                getAffinityFromNextNode();
+
+                                return;
+                            }
 
-                affMap.remove(key, fut0);
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to get affinity from node [cache=" + cacheName +
+                                        ", node=" + node.id() + ", msg=" + e.getMessage() + ']');
 
-                fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e));
+                            onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e));
+                        }
+                    }
+                });
 
-                break;
+                return;
             }
-            catch (RuntimeException | Error e) {
-                fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e));
 
-                break;
-            }
+            onDone(new ClusterGroupEmptyCheckedException("Failed to get cache affinity, all cache nodes failed: " + cacheName));
         }
-
-        return fut0.get();
     }
 
     /**
@@ -529,26 +554,30 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      * @param cacheName Name of cache on which affinity is requested.
      * @param topVer Topology version.
      * @param n Node from which affinity is requested.
-     * @return Affinity cached function.
-     * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects.
+     * @return Affinity future.
      */
-    private AffinityInfo affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n)
-        throws IgniteCheckedException {
-        GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure()
-            .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false).get();
+    private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n) {
+        IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure()
+            .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false);
+
+        return fut.chain(new CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>>, AffinityInfo>() {
+            @Override public AffinityInfo applyx(IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut) throws IgniteCheckedException {
+                GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = fut.get();
 
-        AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1());
-        AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
+                AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1());
+                AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
 
-        assert m != null;
+                assert m != null;
 
-        // Bring to initial state.
-        f.reset();
-        m.reset();
+                // Bring to initial state.
+                f.reset();
+                m.reset();
 
-        CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName);
+                CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName);
 
-        return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg));
+                return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg));
+            }
+        });
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 02e8736..78d1554 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -928,9 +928,21 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                                     mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
 
                                     affFut = ctx.cache().context().exchange().lastTopologyFuture();
+
+                                    if (affFut == null || affFut.isDone()) {
+                                        affFut = null;
+
+                                        // Need asynchronosly fetch affinity if cache is not started on node .
+                                        if (affCacheName != null && ctx.cache().internalCache(affCacheName) == null) {
+                                            affFut = ctx.affinity().affinityCacheFuture(affCacheName, mapTopVer);
+
+                                            if (affFut.isDone())
+                                                affFut = null;
+                                        }
+                                    }
                                 }
 
-                                if (affFut != null && !affFut.isDone()) {
+                                if (affFut != null) {
                                     waitForAffTop = true;
 
                                     jobRes.resetResponse();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index 2c5472e..3eb6974 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -17,13 +17,18 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -142,18 +147,23 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
      */
     @Test
     public void testAffinityCallNoServerNode() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1741");
-
         startGridsMultiThreaded(SRVS + 1);
 
         final Integer key = 1;
 
-        final Ignite client = grid(SRVS);
+        final IgniteEx client = grid(SRVS);
 
         assertTrue(client.configuration().isClientMode());
+        assertNull(client.context().cache().cache(CACHE_NAME));
+
+        final int THREADS = 5;
+
+        CyclicBarrier b = new CyclicBarrier(THREADS + 1);
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
+                b.await();
+
                 for (int i = 0; i < SRVS; ++i)
                     stopGrid(i, false);
 
@@ -162,8 +172,16 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
         });
 
         try {
-            while (!fut.isDone())
-                client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null));
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Void call() throws Exception {
+                    b.await();
+
+                    while (!fut.isDone())
+                        client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null));
+
+                    return null;
+                }
+            }, THREADS, "test-thread");
         }
         catch (ClusterTopologyException e) {
             log.info("Expected error: " + e);
@@ -174,6 +192,48 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAffinityFailoverNoCacheOnClient() throws Exception {
+        startGridsMultiThreaded(SRVS + 1);
+
+        final Integer key = 1;
+
+        final IgniteEx client = grid(SRVS);
+
+        assertTrue(client.configuration().isClientMode());
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < SRVS - 1; ++i) {
+                    U.sleep(ThreadLocalRandom.current().nextLong(100) + 50);
+
+                    stopGrid(i, false);
+                }
+
+                return null;
+            }
+        });
+
+        try {
+            final Affinity<Integer> aff = client.affinity(CACHE_NAME);
+
+            assertNull(client.context().cache().cache(CACHE_NAME));
+
+            GridTestUtils.runMultiThreaded(new Runnable() {
+                @Override public void run() {
+                    while (!fut.isDone())
+                        assertNotNull(aff.mapKeyToNode(key));
+                }
+            }, 5, "test-thread");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * Test callable.
      */
     public static class CheckCallable implements IgniteCallable<Object> {