You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/20 17:05:26 UTC
[10/23] incubator-ignite git commit: #ignite-1087: move affinity key
and affinity cache to FailoverContext.
#ignite-1087: move affinity key and affinity cache to FailoverContext.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f9460a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f9460a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f9460a
Branch: refs/heads/ignite-1087
Commit: c9f9460a5be54baafa6cc2d8cc4de53831a0cbb6
Parents: 585043d
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jul 9 15:07:34 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jul 9 15:07:34 2015 +0300
----------------------------------------------------------------------
.../failover/GridFailoverContextImpl.java | 22 +++++++++++++++++++-
.../managers/failover/GridFailoverManager.java | 7 +++++--
.../processors/task/GridTaskWorker.java | 9 +-------
.../ignite/spi/failover/FailoverContext.java | 17 +++++++++++++++
.../spi/failover/always/AlwaysFailoverSpi.java | 14 ++-----------
.../cache/GridCacheAffinityRoutingSelfTest.java | 2 ++
.../spi/failover/GridFailoverTestContext.java | 10 +++++++++
7 files changed, 58 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
index a3f8e44..0c1e7d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
@@ -41,15 +41,23 @@ public class GridFailoverContextImpl implements FailoverContext {
@GridToStringExclude
private final GridLoadBalancerManager loadMgr;
+ /** Affinity key for affinityCall. */
+ private final Object affKey;
+
+ /** Affinity cache name for affinityCall. */
+ private final String affCacheName;
+
/**
* Initializes failover context.
*
* @param taskSes Grid task session.
* @param jobRes Failed job result.
* @param loadMgr Load manager.
+ * @param affKey Affinity key.
+ * @param affCacheName Affinity cache name.
*/
public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes,
- GridLoadBalancerManager loadMgr) {
+ GridLoadBalancerManager loadMgr, Object affKey, String affCacheName) {
assert taskSes != null;
assert jobRes != null;
assert loadMgr != null;
@@ -57,6 +65,8 @@ public class GridFailoverContextImpl implements FailoverContext {
this.taskSes = taskSes;
this.jobRes = jobRes;
this.loadMgr = loadMgr;
+ this.affKey = affKey;
+ this.affCacheName = affCacheName;
}
/** {@inheritDoc} */
@@ -75,6 +85,16 @@ public class GridFailoverContextImpl implements FailoverContext {
}
/** {@inheritDoc} */
+ @Override public Object affinityKey() {
+ return affKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String affinityCacheName() {
+ return affCacheName;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridFailoverContextImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
index 714cccb..4102514 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
@@ -57,10 +57,13 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
* @param taskSes Task session.
* @param jobRes Job result.
* @param top Collection of all top nodes that does not include the failed node.
+ * @param affKey Affinity key.
+ * @param affCacheName Affinity cache name.
* @return New node to route this job to.
*/
- public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top) {
+ public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top,
+ Object affKey, String affCacheName) {
return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes,
- ctx.loadBalancing()), top);
+ ctx.loadBalancing(), affKey, affCacheName), top);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 61bb62a..c725a42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.visor.util.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.failover.always.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -569,9 +568,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
- if (affKey != null)
- return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey));
-
int size = subgrid.size();
if (size == 0)
@@ -782,9 +778,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
case FAILOVER: {
- if (jobRes != null && affKey != null)
- jobRes.getJobContext().setAttribute(AlwaysFailoverSpi.AFFINITY_CALL_FLAG, true);
-
if (!failover(res, jobRes, getTaskTopology()))
plc = null;
@@ -984,7 +977,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
// Map to a new node.
- ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top));
+ ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
if (node == null) {
String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index b0cae92..eddd9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.failover;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
+import org.apache.ignite.lang.*;
import java.util.*;
@@ -52,4 +53,20 @@ public interface FailoverContext {
* @throws IgniteException If anything failed.
*/
public ClusterNode getBalancedNode(List<ClusterNode> top) throws IgniteException;
+
+ /**
+ * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
+ * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
+ *
+ * @return Affinity key.
+ */
+ public Object affinityKey();
+
+ /**
+ * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
+ * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
+ *
+ * @return Cache name.
+ */
+ public String affinityCacheName();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index c5c0f18..ec07abb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -99,14 +99,6 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
*/
public static final String AFFINITY_CALL_ATTEMPT = "ignite:failover:affinitycallattempt";
- /**
- * Name of job context attribute containing affinity call flag for affinity call.
- *
- * @see org.apache.ignite.compute.ComputeJobContext
- */
- public static final String AFFINITY_CALL_FLAG = "ignite:failover:affinitycall";
-
-
/** Maximum attempts attribute key should be the same on all nodes. */
public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts";
@@ -190,9 +182,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
- Boolean affCall = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_FLAG);
-
- if (affCall != null && affCall) {
+ if (ctx.affinityKey() != null) {
Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
if (affCallAttempt == null)
@@ -208,7 +198,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
else {
ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1);
- return top.get(0);
+ return ignite.affinity(ctx.affinityCacheName()).mapKeyToNode(ctx.affinityKey());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
index 38de9a1..80e558b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
@@ -269,6 +269,8 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
System.out.println("Attempt=" + attempt);
+ System.out.println("RUN ON NODE: " + ignite.cluster().localNode().id());
+
assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode());
jobCtx.setAttribute("Attempt", attempt + 1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
index db64475..bfca83d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
@@ -66,4 +66,14 @@ public class GridFailoverTestContext implements FailoverContext {
@Override public ClusterNode getBalancedNode(List<ClusterNode> grid) {
return grid.get(RAND.nextInt(grid.size()));
}
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String affinityCacheName() {
+ return null;
+ }
}