You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/03 13:38:03 UTC
incubator-ignite git commit: #ignite-1087: affinity run runs several
times on primary node.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1087 [created] 355e33db0
#ignite-1087: affinity run runs several times on primary node.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/355e33db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/355e33db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/355e33db
Branch: refs/heads/ignite-1087
Commit: 355e33db0b0551759a7ce976f6af53bc7ad5a36a
Parents: d8f5b6f
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 3 14:37:55 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 3 14:37:55 2015 +0300
----------------------------------------------------------------------
.../closure/GridClosureProcessor.java | 4 ++
.../task/GridTaskThreadContextKey.java | 6 +++
.../processors/task/GridTaskWorker.java | 16 ++++++
.../spi/failover/always/AlwaysFailoverSpi.java | 37 +++++++++++++
.../cache/GridCacheAffinityRoutingSelfTest.java | 56 +++++++++++++++++++-
5 files changed, 118 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 658557e..1eb53ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -414,6 +414,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
ctx.task().setThreadContext(TC_SUBGRID, nodes);
+ ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+ ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
return ctx.task().execute(new T5(node, job), null, false);
}
@@ -446,6 +448,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
ctx.task().setThreadContext(TC_SUBGRID, nodes);
+ ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+ ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
return ctx.task().execute(new T4(node, job), null, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
index df706cf..0bfac62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
@@ -27,6 +27,12 @@ public enum GridTaskThreadContextKey {
/** No failover flag. */
TC_NO_FAILOVER,
+ /** Affinity key. */
+ TC_AFFINITY_KEY,
+
+ /** Affinity cache. */
+ TC_AFFINITY_CACHE,
+
/** Projection for the task. */
TC_SUBGRID,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index eb5fa77..61bb62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.visor.util.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.failover.always.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -136,6 +137,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
private final boolean noFailover;
/** */
+ private final Object affKey;
+
+ /** */
+ private final String affCache;
+
+ /** */
private final UUID subjId;
/** Continuous mapper. */
@@ -245,6 +252,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
Boolean noFailover = getThreadContext(TC_NO_FAILOVER);
this.noFailover = noFailover != null ? noFailover : false;
+
+ this.affKey = getThreadContext(TC_AFFINITY_KEY);
+ this.affCache = getThreadContext(TC_AFFINITY_CACHE);
}
/**
@@ -559,6 +569,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
+ if (affKey != null)
+ return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey));
+
int size = subgrid.size();
if (size == 0)
@@ -769,6 +782,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
case FAILOVER: {
+ if (jobRes != null && affKey != null)
+ jobRes.getJobContext().setAttribute(AlwaysFailoverSpi.AFFINITY_CALL_FLAG, true);
+
if (!failover(res, jobRes, getTaskTopology()))
plc = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index e075d3e..c5c0f18 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -92,6 +92,21 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
*/
public static final String FAILED_NODE_LIST_ATTR = "gg:failover:failednodelist";
+ /**
+ * Name of job context attribute containing affinity call flag for affinity call.
+ *
+ * @see org.apache.ignite.compute.ComputeJobContext
+ */
+ public static final String AFFINITY_CALL_ATTEMPT = "ignite:failover:affinitycallattempt";
+
+ /**
+ * Name of job context attribute containing affinity call flag for affinity call.
+ *
+ * @see org.apache.ignite.compute.ComputeJobContext
+ */
+ public static final String AFFINITY_CALL_FLAG = "ignite:failover:affinitycall";
+
+
/** Maximum attempts attribute key should be the same on all nodes. */
public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts";
@@ -175,6 +190,28 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
+ Boolean affCall = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_FLAG);
+
+ if (affCall != null && affCall) {
+ Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
+
+ if (affCallAttempt == null)
+ affCallAttempt = 1;
+
+ if (maxFailoverAttempts <= affCallAttempt) {
+ U.warn(log, "Job failover failed because number of maximum failover attempts for affinity call" +
+ " is exceeded [failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" +
+ maxFailoverAttempts + ']');
+
+ return null;
+ }
+ else {
+ ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1);
+
+ return top.get(0);
+ }
+ }
+
if (failedNodes == null)
failedNodes = U.newHashSet(1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
index 78ecf08..38de9a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
@@ -19,17 +19,20 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
+import java.util.concurrent.*;
+
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
@@ -129,6 +132,18 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAffinityRunRestart() throws Exception {
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, "key", new FailedRunnable("key"));
+ return null;
+ }
+ }, ClusterTopologyException.class, "Failed to failover a job to another node");
+ }
+
+ /**
* JUnit.
*
* @throws Exception If failed.
@@ -224,6 +239,45 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
}
/**
+ * Test runnable.
+ */
+ private static class FailedRunnable extends CAX {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ @JobContextResource
+ private ComputeJobContext jobCtx;
+
+ /** Key. */
+ private final Object key;
+
+ /**
+ * @param key Key.
+ */
+ public FailedRunnable(Object key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void applyx() throws IgniteCheckedException {
+ Integer attempt = jobCtx.getAttribute("Attempt");
+
+ if (attempt == null)
+ attempt = 0;
+
+ System.out.println("Attempt=" + attempt);
+
+ assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode());
+
+ jobCtx.setAttribute("Attempt", attempt + 1);
+
+ throw new ComputeJobFailoverException("Failover exception.");
+ }
+ }
+
+ /**
* Test callable.
*/
private static class CheckCallable implements IgniteCallable<Object> {