You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 08:54:20 UTC
[11/41] ignite git commit: ignite-3209 Waiting for affinity topology
in case of failover for affinity call
ignite-3209 Waiting for affinity topology in case of failover for affinity call
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d80a398
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d80a398
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d80a398
Branch: refs/heads/ignite-3331
Commit: 1d80a398a40dd8b4b86bb83499cf0efd1f35b82b
Parents: 54425bf
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 14 17:32:25 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Jun 14 18:42:17 2016 +0300
----------------------------------------------------------------------
.../processors/task/GridTaskWorker.java | 130 ++++++++++++-------
.../cache/CacheAffinityCallSelfTest.java | 4 -
2 files changed, 81 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d80a398/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index dc86343..651259d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -61,12 +61,15 @@ import org.apache.ignite.internal.GridJobSiblingImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -76,6 +79,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -847,8 +852,25 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
case FAILOVER: {
- if (!failover(res, jobRes, getTaskTopology()))
- plc = null;
+ IgniteInternalFuture<Boolean> fut = failover(res, jobRes, getTaskTopology());
+
+ final GridJobResultImpl jobRes0 = jobRes;
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+ try {
+ Boolean res = fut0.get();
+
+ if (res)
+ sendFailoverRequest(jobRes0);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to failover task [ses=" + ses + ", err=" + e + ']', e);
+
+ finishTask(null, e);
+ }
+ }
+ });
break;
}
@@ -856,16 +878,11 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
// Outside of synchronization.
- if (plc != null) {
- // Handle failover.
- if (plc == FAILOVER)
- sendFailoverRequest(jobRes);
- else {
- evtLsnr.onJobFinished(this, jobRes.getSibling());
+ if (plc != null && plc != FAILOVER) {
+ evtLsnr.onJobFinished(this, jobRes.getSibling());
- if (plc == ComputeJobResultPolicy.REDUCE)
- reduce(results);
- }
+ if (plc == ComputeJobResultPolicy.REDUCE)
+ reduce(results);
}
}
catch (IgniteCheckedException e) {
@@ -1039,59 +1056,74 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
* @param top Topology.
* @return {@code True} if fail-over SPI returned a new node.
*/
- private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) {
- assert Thread.holdsLock(mux);
+ private IgniteInternalFuture<Boolean> failover(
+ final GridJobExecuteResponse res,
+ final GridJobResultImpl jobRes,
+ final Collection<? extends ClusterNode> top
+ ) {
+ IgniteInternalFuture<?> affFut = null;
- try {
- ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
+ if (affKey != null) {
+ Long topVer = ctx.discovery().topologyVersion();
- // Map to a new node.
- ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
+ affFut = ctx.cache().context().exchange().affinityReadyFuture(new AffinityTopologyVersion(topVer));
+ }
- if (node == null) {
- String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
- jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
+ if (affFut == null)
+ affFut = new GridFinishedFuture();
- if (log.isDebugEnabled())
- log.debug(msg);
+ return affFut.chain(new IgniteClosure<IgniteInternalFuture<?>, Boolean>() {
+ @Override public Boolean apply(IgniteInternalFuture<?> fut0) {
+ synchronized (mux) {
+ try {
+ ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
- Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+ // Map to a new node.
+ ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
- finishTask(null, e);
+ if (node == null) {
+ String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
+ jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
- return false;
- }
+ if (log.isDebugEnabled())
+ log.debug(msg);
- if (log.isDebugEnabled())
- log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
- ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+ Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
- jobRes.setNode(node);
- jobRes.resetResponse();
+ finishTask(null, e);
- if (!resCache) {
- synchronized (mux) {
- // Store result back in map before sending.
- this.jobRes.put(res.getJobId(), jobRes);
- }
- }
+ return false;
+ }
- return true;
- }
- // Catch Throwable to protect against bad user code.
- catch (Throwable e) {
- String errMsg = "Failed to failover job due to undeclared user exception [job=" +
- jobRes.getJob() + ", err=" + e + ']';
+ if (log.isDebugEnabled())
+ log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
+ ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
- U.error(log, errMsg, e);
+ jobRes.setNode(node);
+ jobRes.resetResponse();
- finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+ if (!resCache) // Store result back in map before sending.
+ GridTaskWorker.this.jobRes.put(res.getJobId(), jobRes);
- if (e instanceof Error)
- throw (Error)e;
+ return true;
+ }
+ // Catch Throwable to protect against bad user code.
+ catch (Throwable e) {
+ String errMsg = "Failed to failover job due to undeclared user exception [job=" +
+ jobRes.getJob() + ", err=" + e + ']';
- return false;
- }
+ U.error(log, errMsg, e);
+
+ finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+
+ if (e instanceof Error)
+ throw (Error)e;
+
+ return false;
+ }
+ }
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d80a398/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index 8530fbb..e4b6ece 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -50,9 +50,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
private static final String CACHE_NAME = "myCache";
/** */
- private static final int MAX_FAILOVER_ATTEMPTS = 105;
-
- /** */
private static final int SERVERS_COUNT = 4;
/** */
@@ -69,7 +66,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(spi);
AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
- failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
cfg.setFailoverSpi(failSpi);
CacheConfiguration ccfg = defaultCacheConfiguration();