You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 12:38:57 UTC

[28/43] ignite git commit: ignite-3209 Review fixes

ignite-3209 Review fixes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a3ffc15
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a3ffc15
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a3ffc15

Branch: refs/heads/ignite-3335
Commit: 2a3ffc15d661cb9f097a65e2760afb3bcbad957b
Parents: ebe5658
Author: agura <ag...@gridgain.com>
Authored: Wed Jun 15 15:44:58 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Jun 15 15:44:58 2016 +0300

----------------------------------------------------------------------
 .../processors/task/GridTaskWorker.java         | 154 ++++++++++---------
 1 file changed, 80 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2a3ffc15/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 0c522ad..fc56893 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -69,7 +69,6 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.closure.AffinityTask;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -79,7 +78,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
@@ -683,6 +681,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             // job response was changed in this method apply.
             boolean selfOccupied = false;
 
+            IgniteInternalFuture<?> affFut = null;
+
+            boolean waitForAffTop = false;
+
+            final GridJobExecuteResponse failoverRes = res;
+
             try {
                 synchronized (mux) {
                     // If task is not waiting for responses,
@@ -852,25 +856,19 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         }
 
                         case FAILOVER: {
-                            IgniteInternalFuture<Boolean> fut = failover(res, jobRes, getTaskTopology());
+                            if (affKey != null) {
+                                AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 
-                            final GridJobResultImpl jobRes0 = jobRes;
-
-                            fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
-                                @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
-                                    try {
-                                        Boolean res = fut0.get();
+                                affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+                            }
 
-                                        if (res)
-                                            sendFailoverRequest(jobRes0);
-                                    }
-                                    catch (IgniteCheckedException e) {
-                                        U.error(log, "Failed to failover task [ses=" + ses + ", err=" + e + ']', e);
+                            if (affFut != null && !affFut.isDone()) {
+                                waitForAffTop = true;
 
-                                        finishTask(null, e);
-                                    }
-                                }
-                            });
+                                jobRes.resetResponse();
+                            }
+                            else if (!failover(res, jobRes, getTaskTopology()))
+                                plc = null;
 
                             break;
                         }
@@ -878,17 +876,24 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                 }
 
                 // Outside of synchronization.
-                if (plc != null && plc != FAILOVER) {
-                    evtLsnr.onJobFinished(this, jobRes.getSibling());
+                if (plc != null && !waitForAffTop) {
+                    // Handle failover.
+                    if (plc == FAILOVER)
+                        sendFailoverRequest(jobRes);
+                    else {
+                        evtLsnr.onJobFinished(this, jobRes.getSibling());
 
-                    if (plc == ComputeJobResultPolicy.REDUCE)
-                        reduce(results);
+                        if (plc == ComputeJobResultPolicy.REDUCE)
+                            reduce(results);
+                    }
                 }
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);
 
                 finishTask(null, e);
+
+                waitForAffTop = false;
             }
             finally {
                 // Open up job for processing responses.
@@ -907,6 +912,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     res = delayedRess.poll();
                 }
             }
+
+            if (waitForAffTop && affFut != null) {
+                affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> fut0) {
+                        ctx.closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                onResponse(failoverRes);
+                            }
+                        }, false);
+                    }
+                });
+            }
         }
     }
 
@@ -1056,74 +1073,63 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
      * @param top Topology.
      * @return {@code True} if fail-over SPI returned a new node.
      */
-    private IgniteInternalFuture<Boolean> failover(
-        final GridJobExecuteResponse res,
-        final GridJobResultImpl jobRes,
-        final Collection<? extends ClusterNode> top
+    private boolean failover(
+        GridJobExecuteResponse res,
+        GridJobResultImpl jobRes,
+        Collection<? extends ClusterNode> top
     ) {
-        IgniteInternalFuture<?> affFut = null;
-
-        if (affKey != null) {
-            AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
-
-            affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
-        }
-
-        if (affFut == null)
-            affFut = new GridFinishedFuture();
+        assert Thread.holdsLock(mux);
 
-        return affFut.chain(new IgniteClosure<IgniteInternalFuture<?>, Boolean>() {
-            @Override public Boolean apply(IgniteInternalFuture<?> fut0) {
-                synchronized (mux) {
-                    try {
-                        ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
+        try {
+            ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
 
-                        // Map to a new node.
-                        ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
+            // Map to a new node.
+            ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
 
-                        if (node == null) {
-                            String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
-                                jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
+            if (node == null) {
+                String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
+                    jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
 
-                            if (log.isDebugEnabled())
-                                log.debug(msg);
+                if (log.isDebugEnabled())
+                    log.debug(msg);
 
-                            Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+                Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
 
-                            finishTask(null, e);
+                finishTask(null, e);
 
-                            return false;
-                        }
+                return false;
+            }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
-                                ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+            if (log.isDebugEnabled())
+                log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
+                    ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
 
-                        jobRes.setNode(node);
-                        jobRes.resetResponse();
+            jobRes.setNode(node);
+            jobRes.resetResponse();
 
-                        if (!resCache) // Store result back in map before sending.
-                            GridTaskWorker.this.jobRes.put(res.getJobId(), jobRes);
+            if (!resCache) {
+                synchronized (mux) {
+                    // Store result back in map before sending.
+                    this.jobRes.put(res.getJobId(), jobRes);
+                }
+            }
 
-                        return true;
-                    }
-                    // Catch Throwable to protect against bad user code.
-                    catch (Throwable e) {
-                        String errMsg = "Failed to failover job due to undeclared user exception [job=" +
-                            jobRes.getJob() + ", err=" + e + ']';
+            return true;
+        }
+        // Catch Throwable to protect against bad user code.
+        catch (Throwable e) {
+            String errMsg = "Failed to failover job due to undeclared user exception [job=" +
+                jobRes.getJob() + ", err=" + e + ']';
 
-                        U.error(log, errMsg, e);
+            U.error(log, errMsg, e);
 
-                        finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+            finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
 
-                        if (e instanceof Error)
-                            throw (Error)e;
+            if (e instanceof Error)
+                throw (Error)e;
 
-                        return false;
-                    }
-                }
-            }
-        });
+            return false;
+        }
     }
 
     /**