You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 12:38:57 UTC
[28/43] ignite git commit: ignite-3209 Review fixes
ignite-3209 Review fixes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a3ffc15
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a3ffc15
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a3ffc15
Branch: refs/heads/ignite-3335
Commit: 2a3ffc15d661cb9f097a65e2760afb3bcbad957b
Parents: ebe5658
Author: agura <ag...@gridgain.com>
Authored: Wed Jun 15 15:44:58 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Jun 15 15:44:58 2016 +0300
----------------------------------------------------------------------
.../processors/task/GridTaskWorker.java | 154 ++++++++++---------
1 file changed, 80 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a3ffc15/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 0c522ad..fc56893 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -69,7 +69,6 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -79,7 +78,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
@@ -683,6 +681,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
// job response was changed in this method apply.
boolean selfOccupied = false;
+ IgniteInternalFuture<?> affFut = null;
+
+ boolean waitForAffTop = false;
+
+ final GridJobExecuteResponse failoverRes = res;
+
try {
synchronized (mux) {
// If task is not waiting for responses,
@@ -852,25 +856,19 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
case FAILOVER: {
- IgniteInternalFuture<Boolean> fut = failover(res, jobRes, getTaskTopology());
+ if (affKey != null) {
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
- final GridJobResultImpl jobRes0 = jobRes;
-
- fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
- try {
- Boolean res = fut0.get();
+ affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+ }
- if (res)
- sendFailoverRequest(jobRes0);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to failover task [ses=" + ses + ", err=" + e + ']', e);
+ if (affFut != null && !affFut.isDone()) {
+ waitForAffTop = true;
- finishTask(null, e);
- }
- }
- });
+ jobRes.resetResponse();
+ }
+ else if (!failover(res, jobRes, getTaskTopology()))
+ plc = null;
break;
}
@@ -878,17 +876,24 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
// Outside of synchronization.
- if (plc != null && plc != FAILOVER) {
- evtLsnr.onJobFinished(this, jobRes.getSibling());
+ if (plc != null && !waitForAffTop) {
+ // Handle failover.
+ if (plc == FAILOVER)
+ sendFailoverRequest(jobRes);
+ else {
+ evtLsnr.onJobFinished(this, jobRes.getSibling());
- if (plc == ComputeJobResultPolicy.REDUCE)
- reduce(results);
+ if (plc == ComputeJobResultPolicy.REDUCE)
+ reduce(results);
+ }
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);
finishTask(null, e);
+
+ waitForAffTop = false;
}
finally {
// Open up job for processing responses.
@@ -907,6 +912,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
res = delayedRess.poll();
}
}
+
+ if (waitForAffTop && affFut != null) {
+ affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut0) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ onResponse(failoverRes);
+ }
+ }, false);
+ }
+ });
+ }
}
}
@@ -1056,74 +1073,63 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
* @param top Topology.
* @return {@code True} if fail-over SPI returned a new node.
*/
- private IgniteInternalFuture<Boolean> failover(
- final GridJobExecuteResponse res,
- final GridJobResultImpl jobRes,
- final Collection<? extends ClusterNode> top
+ private boolean failover(
+ GridJobExecuteResponse res,
+ GridJobResultImpl jobRes,
+ Collection<? extends ClusterNode> top
) {
- IgniteInternalFuture<?> affFut = null;
-
- if (affKey != null) {
- AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
-
- affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
- }
-
- if (affFut == null)
- affFut = new GridFinishedFuture();
+ assert Thread.holdsLock(mux);
- return affFut.chain(new IgniteClosure<IgniteInternalFuture<?>, Boolean>() {
- @Override public Boolean apply(IgniteInternalFuture<?> fut0) {
- synchronized (mux) {
- try {
- ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
+ try {
+ ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
- // Map to a new node.
- ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
+ // Map to a new node.
+ ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
- if (node == null) {
- String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
- jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
+ if (node == null) {
+ String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
+ jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
- if (log.isDebugEnabled())
- log.debug(msg);
+ if (log.isDebugEnabled())
+ log.debug(msg);
- Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+ Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
- finishTask(null, e);
+ finishTask(null, e);
- return false;
- }
+ return false;
+ }
- if (log.isDebugEnabled())
- log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
- ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+ if (log.isDebugEnabled())
+ log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
+ ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
- jobRes.setNode(node);
- jobRes.resetResponse();
+ jobRes.setNode(node);
+ jobRes.resetResponse();
- if (!resCache) // Store result back in map before sending.
- GridTaskWorker.this.jobRes.put(res.getJobId(), jobRes);
+ if (!resCache) {
+ synchronized (mux) {
+ // Store result back in map before sending.
+ this.jobRes.put(res.getJobId(), jobRes);
+ }
+ }
- return true;
- }
- // Catch Throwable to protect against bad user code.
- catch (Throwable e) {
- String errMsg = "Failed to failover job due to undeclared user exception [job=" +
- jobRes.getJob() + ", err=" + e + ']';
+ return true;
+ }
+ // Catch Throwable to protect against bad user code.
+ catch (Throwable e) {
+ String errMsg = "Failed to failover job due to undeclared user exception [job=" +
+ jobRes.getJob() + ", err=" + e + ']';
- U.error(log, errMsg, e);
+ U.error(log, errMsg, e);
- finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+ finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
- if (e instanceof Error)
- throw (Error)e;
+ if (e instanceof Error)
+ throw (Error)e;
- return false;
- }
- }
- }
- });
+ return false;
+ }
}
/**