You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/15 15:04:17 UTC

ignite git commit: ignite-3209 Waiting for affinity topology in case of failover for affinity call

Repository: ignite
Updated Branches:
  refs/heads/master b742f5fc8 -> 4ed02fd02


ignite-3209 Waiting for affinity topology in case of failover for affinity call


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ed02fd0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ed02fd0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ed02fd0

Branch: refs/heads/master
Commit: 4ed02fd028d152d76d0e59d462ffcca7e2101b77
Parents: b742f5f
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 14 17:32:25 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Jun 15 17:48:29 2016 +0300

----------------------------------------------------------------------
 .../processors/task/GridTaskWorker.java         | 44 ++++++++++++++++++--
 .../cache/CacheAffinityCallSelfTest.java        | 42 +++++++++++++------
 2 files changed, 71 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4ed02fd0/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 05970ed..415d632 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -61,10 +61,12 @@ import org.apache.ignite.internal.GridJobSiblingImpl;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.closure.AffinityTask;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.typedef.CO;
@@ -76,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -678,6 +681,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             // job response was changed in this method apply.
             boolean selfOccupied = false;
 
+            IgniteInternalFuture<?> affFut = null;
+
+            boolean waitForAffTop = false;
+
+            final GridJobExecuteResponse failoverRes = res;
+
             try {
                 synchronized (mux) {
                     // If task is not waiting for responses,
@@ -847,7 +856,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         }
 
                         case FAILOVER: {
-                            if (!failover(res, jobRes, getTaskTopology()))
+                            if (affKey != null) {
+                                AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+
+                                affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+                            }
+
+                            if (affFut != null && !affFut.isDone()) {
+                                waitForAffTop = true;
+
+                                jobRes.resetResponse();
+                            }
+                            else if (!failover(res, jobRes, getTaskTopology()))
                                 plc = null;
 
                             break;
@@ -856,7 +876,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                 }
 
                 // Outside of synchronization.
-                if (plc != null) {
+                if (plc != null && !waitForAffTop) {
                     // Handle failover.
                     if (plc == FAILOVER)
                         sendFailoverRequest(jobRes);
@@ -872,6 +892,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                 U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);
 
                 finishTask(null, e);
+
+                waitForAffTop = false;
             }
             finally {
                 // Open up job for processing responses.
@@ -890,6 +912,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     res = delayedRess.poll();
                 }
             }
+
+            if (waitForAffTop && affFut != null) {
+                affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> fut0) {
+                        ctx.closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                onResponse(failoverRes);
+                            }
+                        }, false);
+                    }
+                });
+            }
         }
     }
 
@@ -1039,7 +1073,11 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
      * @param top Topology.
      * @return {@code True} if fail-over SPI returned a new node.
      */
-    private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) {
+    private boolean failover(
+        GridJobExecuteResponse res,
+        GridJobResultImpl jobRes,
+        Collection<? extends ClusterNode> top
+    ) {
         assert Thread.holdsLock(mux);
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ed02fd0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index a1762cc..92e2b9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -50,9 +50,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
     private static final String CACHE_NAME = "myCache";
 
     /** */
-    private static final int MAX_FAILOVER_ATTEMPTS = 500;
-
-    /** */
     private static final int SRVS = 4;
 
     /** */
@@ -69,21 +66,22 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
         cfg.setDiscoverySpi(spi);
 
         AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
-        failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
         cfg.setFailoverSpi(failSpi);
 
-        CacheConfiguration ccfg = defaultCacheConfiguration();
-        ccfg.setName(CACHE_NAME);
-        ccfg.setCacheMode(PARTITIONED);
-        ccfg.setBackups(1);
-
-        cfg.setCacheConfiguration(ccfg);
-
+        // Do not configure cache on client.
         if (gridName.equals(getTestGridName(SRVS))) {
             cfg.setClientMode(true);
 
             spi.setForceServerMode(true);
         }
+        else {
+            CacheConfiguration ccfg = defaultCacheConfiguration();
+            ccfg.setName(CACHE_NAME);
+            ccfg.setCacheMode(PARTITIONED);
+            ccfg.setBackups(1);
+
+            cfg.setCacheConfiguration(ccfg);
+        }
 
         return cfg;
     }
@@ -99,7 +97,27 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
     public void testAffinityCallRestartNode() throws Exception {
         startGridsMultiThreaded(SRVS);
 
-        final int ITERS = 5;
+        affinityCallRestartNode();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityCallFromClientRestartNode() throws Exception {
+        startGridsMultiThreaded(SRVS + 1);
+
+        Ignite client = grid(SRVS);
+
+        assertTrue(client.configuration().isClientMode());
+
+        affinityCallRestartNode();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void affinityCallRestartNode() throws Exception {
+        final int ITERS = 10;
 
         for (int i = 0; i < ITERS; i++) {
             log.info("Iteration: " + i);