You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/15 15:04:17 UTC
ignite git commit: ignite-3209 Waiting for affinity topology in case
of failover for affinity call
Repository: ignite
Updated Branches:
refs/heads/master b742f5fc8 -> 4ed02fd02
ignite-3209 Waiting for affinity topology in case of failover for affinity call
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ed02fd0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ed02fd0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ed02fd0
Branch: refs/heads/master
Commit: 4ed02fd028d152d76d0e59d462ffcca7e2101b77
Parents: b742f5f
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 14 17:32:25 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Jun 15 17:48:29 2016 +0300
----------------------------------------------------------------------
.../processors/task/GridTaskWorker.java | 44 ++++++++++++++++++--
.../cache/CacheAffinityCallSelfTest.java | 42 +++++++++++++------
2 files changed, 71 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4ed02fd0/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 05970ed..415d632 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -61,10 +61,12 @@ import org.apache.ignite.internal.GridJobSiblingImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.typedef.CO;
@@ -76,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -678,6 +681,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
// job response was changed in this method apply.
boolean selfOccupied = false;
+ IgniteInternalFuture<?> affFut = null;
+
+ boolean waitForAffTop = false;
+
+ final GridJobExecuteResponse failoverRes = res;
+
try {
synchronized (mux) {
// If task is not waiting for responses,
@@ -847,7 +856,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
case FAILOVER: {
- if (!failover(res, jobRes, getTaskTopology()))
+ if (affKey != null) {
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+
+ affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+ }
+
+ if (affFut != null && !affFut.isDone()) {
+ waitForAffTop = true;
+
+ jobRes.resetResponse();
+ }
+ else if (!failover(res, jobRes, getTaskTopology()))
plc = null;
break;
@@ -856,7 +876,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
// Outside of synchronization.
- if (plc != null) {
+ if (plc != null && !waitForAffTop) {
// Handle failover.
if (plc == FAILOVER)
sendFailoverRequest(jobRes);
@@ -872,6 +892,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);
finishTask(null, e);
+
+ waitForAffTop = false;
}
finally {
// Open up job for processing responses.
@@ -890,6 +912,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
res = delayedRess.poll();
}
}
+
+ if (waitForAffTop && affFut != null) {
+ affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut0) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ onResponse(failoverRes);
+ }
+ }, false);
+ }
+ });
+ }
}
}
@@ -1039,7 +1073,11 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
* @param top Topology.
* @return {@code True} if fail-over SPI returned a new node.
*/
- private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) {
+ private boolean failover(
+ GridJobExecuteResponse res,
+ GridJobResultImpl jobRes,
+ Collection<? extends ClusterNode> top
+ ) {
assert Thread.holdsLock(mux);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4ed02fd0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index a1762cc..92e2b9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -50,9 +50,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
private static final String CACHE_NAME = "myCache";
/** */
- private static final int MAX_FAILOVER_ATTEMPTS = 500;
-
- /** */
private static final int SRVS = 4;
/** */
@@ -69,21 +66,22 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(spi);
AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
- failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
cfg.setFailoverSpi(failSpi);
- CacheConfiguration ccfg = defaultCacheConfiguration();
- ccfg.setName(CACHE_NAME);
- ccfg.setCacheMode(PARTITIONED);
- ccfg.setBackups(1);
-
- cfg.setCacheConfiguration(ccfg);
-
+ // Do not configure cache on client.
if (gridName.equals(getTestGridName(SRVS))) {
cfg.setClientMode(true);
spi.setForceServerMode(true);
}
+ else {
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+ ccfg.setName(CACHE_NAME);
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+ }
return cfg;
}
@@ -99,7 +97,27 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
public void testAffinityCallRestartNode() throws Exception {
startGridsMultiThreaded(SRVS);
- final int ITERS = 5;
+ affinityCallRestartNode();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityCallFromClientRestartNode() throws Exception {
+ startGridsMultiThreaded(SRVS + 1);
+
+ Ignite client = grid(SRVS);
+
+ assertTrue(client.configuration().isClientMode());
+
+ affinityCallRestartNode();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void affinityCallRestartNode() throws Exception {
+ final int ITERS = 10;
for (int i = 0; i < ITERS; i++) {
log.info("Iteration: " + i);