You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 08:54:20 UTC

[11/41] ignite git commit: ignite-3209 Waiting for affinity topology in case of failover for affinity call

ignite-3209 Waiting for affinity topology in case of failover for affinity call


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d80a398
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d80a398
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d80a398

Branch: refs/heads/ignite-3331
Commit: 1d80a398a40dd8b4b86bb83499cf0efd1f35b82b
Parents: 54425bf
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 14 17:32:25 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Jun 14 18:42:17 2016 +0300

----------------------------------------------------------------------
 .../processors/task/GridTaskWorker.java         | 130 ++++++++++++-------
 .../cache/CacheAffinityCallSelfTest.java        |   4 -
 2 files changed, 81 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d80a398/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index dc86343..651259d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -61,12 +61,15 @@ import org.apache.ignite.internal.GridJobSiblingImpl;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.closure.AffinityTask;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -76,6 +79,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -847,8 +852,25 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         }
 
                         case FAILOVER: {
-                            if (!failover(res, jobRes, getTaskTopology()))
-                                plc = null;
+                            IgniteInternalFuture<Boolean> fut = failover(res, jobRes, getTaskTopology());
+
+                            final GridJobResultImpl jobRes0 = jobRes;
+
+                            fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                                @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+                                    try {
+                                        Boolean res = fut0.get();
+
+                                        if (res)
+                                            sendFailoverRequest(jobRes0);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        U.error(log, "Failed to failover task [ses=" + ses + ", err=" + e + ']', e);
+
+                                        finishTask(null, e);
+                                    }
+                                }
+                            });
 
                             break;
                         }
@@ -856,16 +878,11 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                 }
 
                 // Outside of synchronization.
-                if (plc != null) {
-                    // Handle failover.
-                    if (plc == FAILOVER)
-                        sendFailoverRequest(jobRes);
-                    else {
-                        evtLsnr.onJobFinished(this, jobRes.getSibling());
+                if (plc != null && plc != FAILOVER) {
+                    evtLsnr.onJobFinished(this, jobRes.getSibling());
 
-                        if (plc == ComputeJobResultPolicy.REDUCE)
-                            reduce(results);
-                    }
+                    if (plc == ComputeJobResultPolicy.REDUCE)
+                        reduce(results);
                 }
             }
             catch (IgniteCheckedException e) {
@@ -1039,59 +1056,74 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
      * @param top Topology.
      * @return {@code True} if fail-over SPI returned a new node.
      */
-    private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) {
-        assert Thread.holdsLock(mux);
+    private IgniteInternalFuture<Boolean> failover(
+        final GridJobExecuteResponse res,
+        final GridJobResultImpl jobRes,
+        final Collection<? extends ClusterNode> top
+    ) {
+        IgniteInternalFuture<?> affFut = null;
 
-        try {
-            ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
+        if (affKey != null) {
+            Long topVer = ctx.discovery().topologyVersion();
 
-            // Map to a new node.
-            ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
+            affFut = ctx.cache().context().exchange().affinityReadyFuture(new AffinityTopologyVersion(topVer));
+        }
 
-            if (node == null) {
-                String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
-                    jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
+        if (affFut == null)
+            affFut = new GridFinishedFuture();
 
-                if (log.isDebugEnabled())
-                    log.debug(msg);
+        return affFut.chain(new IgniteClosure<IgniteInternalFuture<?>, Boolean>() {
+            @Override public Boolean apply(IgniteInternalFuture<?> fut0) {
+                synchronized (mux) {
+                    try {
+                        ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
 
-                Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+                        // Map to a new node.
+                        ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
 
-                finishTask(null, e);
+                        if (node == null) {
+                            String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
+                                jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
 
-                return false;
-            }
+                            if (log.isDebugEnabled())
+                                log.debug(msg);
 
-            if (log.isDebugEnabled())
-                log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
-                    ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+                            Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
 
-            jobRes.setNode(node);
-            jobRes.resetResponse();
+                            finishTask(null, e);
 
-            if (!resCache) {
-                synchronized (mux) {
-                    // Store result back in map before sending.
-                    this.jobRes.put(res.getJobId(), jobRes);
-                }
-            }
+                            return false;
+                        }
 
-            return true;
-        }
-        // Catch Throwable to protect against bad user code.
-        catch (Throwable e) {
-            String errMsg = "Failed to failover job due to undeclared user exception [job=" +
-                jobRes.getJob() + ", err=" + e + ']';
+                        if (log.isDebugEnabled())
+                            log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
+                                ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
 
-            U.error(log, errMsg, e);
+                        jobRes.setNode(node);
+                        jobRes.resetResponse();
 
-            finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+                        if (!resCache) // Store result back in map before sending.
+                            GridTaskWorker.this.jobRes.put(res.getJobId(), jobRes);
 
-            if (e instanceof Error)
-                throw (Error)e;
+                        return true;
+                    }
+                    // Catch Throwable to protect against bad user code.
+                    catch (Throwable e) {
+                        String errMsg = "Failed to failover job due to undeclared user exception [job=" +
+                            jobRes.getJob() + ", err=" + e + ']';
 
-            return false;
-        }
+                        U.error(log, errMsg, e);
+
+                        finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+
+                        if (e instanceof Error)
+                            throw (Error)e;
+
+                        return false;
+                    }
+                }
+            }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d80a398/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index 8530fbb..e4b6ece 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -50,9 +50,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
     private static final String CACHE_NAME = "myCache";
 
     /** */
-    private static final int MAX_FAILOVER_ATTEMPTS = 105;
-
-    /** */
     private static final int SERVERS_COUNT = 4;
 
     /** */
@@ -69,7 +66,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
         cfg.setDiscoverySpi(spi);
 
         AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
-        failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
         cfg.setFailoverSpi(failSpi);
 
         CacheConfiguration ccfg = defaultCacheConfiguration();