You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/20 17:05:26 UTC

[10/23] incubator-ignite git commit: #ignite-1087: move affinity key and affinity cache to FailoverContext.

#ignite-1087: move affinity key and affinity cache to FailoverContext.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f9460a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f9460a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f9460a

Branch: refs/heads/ignite-1087
Commit: c9f9460a5be54baafa6cc2d8cc4de53831a0cbb6
Parents: 585043d
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jul 9 15:07:34 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jul 9 15:07:34 2015 +0300

----------------------------------------------------------------------
 .../failover/GridFailoverContextImpl.java       | 22 +++++++++++++++++++-
 .../managers/failover/GridFailoverManager.java  |  7 +++++--
 .../processors/task/GridTaskWorker.java         |  9 +-------
 .../ignite/spi/failover/FailoverContext.java    | 17 +++++++++++++++
 .../spi/failover/always/AlwaysFailoverSpi.java  | 14 ++-----------
 .../cache/GridCacheAffinityRoutingSelfTest.java |  2 ++
 .../spi/failover/GridFailoverTestContext.java   | 10 +++++++++
 7 files changed, 58 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
index a3f8e44..0c1e7d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
@@ -41,15 +41,23 @@ public class GridFailoverContextImpl implements FailoverContext {
     @GridToStringExclude
     private final GridLoadBalancerManager loadMgr;
 
+    /** Affinity key for affinityCall. */
+    private final Object affKey;
+
+    /** Affinity cache name for affinityCall. */
+    private final String affCacheName;
+
     /**
      * Initializes failover context.
      *
      * @param taskSes Grid task session.
      * @param jobRes Failed job result.
      * @param loadMgr Load manager.
+     * @param affKey Affinity key.
+     * @param affCacheName Affinity cache name.
      */
     public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes,
-        GridLoadBalancerManager loadMgr) {
+        GridLoadBalancerManager loadMgr, Object affKey, String affCacheName) {
         assert taskSes != null;
         assert jobRes != null;
         assert loadMgr != null;
@@ -57,6 +65,8 @@ public class GridFailoverContextImpl implements FailoverContext {
         this.taskSes = taskSes;
         this.jobRes = jobRes;
         this.loadMgr = loadMgr;
+        this.affKey = affKey;
+        this.affCacheName = affCacheName;
     }
 
     /** {@inheritDoc} */
@@ -75,6 +85,16 @@ public class GridFailoverContextImpl implements FailoverContext {
     }
 
     /** {@inheritDoc} */
+    @Override public Object affinityKey() {
+        return affKey;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String affinityCacheName() {
+        return affCacheName;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridFailoverContextImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
index 714cccb..4102514 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
@@ -57,10 +57,13 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
      * @param taskSes Task session.
      * @param jobRes Job result.
      * @param top Collection of all top nodes that does not include the failed node.
+     * @param affKey Affinity key.
+     * @param affCacheName Affinity cache name.
      * @return New node to route this job to.
      */
-    public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top) {
+    public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top,
+        Object affKey, String affCacheName) {
         return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes,
-            ctx.loadBalancing()), top);
+            ctx.loadBalancing(), affKey, affCacheName), top);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 61bb62a..c725a42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.visor.util.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.failover.always.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -569,9 +568,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
         Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
 
-        if (affKey != null)
-            return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey));
-
         int size = subgrid.size();
 
         if (size == 0)
@@ -782,9 +778,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         }
 
                         case FAILOVER: {
-                            if (jobRes != null && affKey != null)
-                                jobRes.getJobContext().setAttribute(AlwaysFailoverSpi.AFFINITY_CALL_FLAG, true);
-
                             if (!failover(res, jobRes, getTaskTopology()))
                                 plc = null;
 
@@ -984,7 +977,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
 
             // Map to a new node.
-            ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top));
+            ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
 
             if (node == null) {
                 String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index b0cae92..eddd9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.failover;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
+import org.apache.ignite.lang.*;
 
 import java.util.*;
 
@@ -52,4 +53,20 @@ public interface FailoverContext {
      * @throws IgniteException If anything failed.
      */
     public ClusterNode getBalancedNode(List<ClusterNode> top) throws IgniteException;
+
+    /**
+     * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
+     * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
+     *
+     * @return Affinity key.
+     */
+    public Object affinityKey();
+
+    /**
+     * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
+     * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
+     *
+     * @return Cache name.
+     */
+    public String affinityCacheName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index c5c0f18..ec07abb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -99,14 +99,6 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
      */
     public static final String AFFINITY_CALL_ATTEMPT = "ignite:failover:affinitycallattempt";
 
-    /**
-     * Name of job context attribute containing affinity call flag for affinity call.
-     *
-     * @see org.apache.ignite.compute.ComputeJobContext
-     */
-    public static final String AFFINITY_CALL_FLAG = "ignite:failover:affinitycall";
-
-
     /** Maximum attempts attribute key should be the same on all nodes. */
     public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts";
 
@@ -190,9 +182,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
 
         Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
 
-        Boolean affCall = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_FLAG);
-
-        if (affCall != null && affCall) {
+        if (ctx.affinityKey() != null) {
             Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
 
             if (affCallAttempt == null)
@@ -208,7 +198,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
             else {
                 ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1);
 
-                return top.get(0);
+                return ignite.affinity(ctx.affinityCacheName()).mapKeyToNode(ctx.affinityKey());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
index 38de9a1..80e558b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
@@ -269,6 +269,8 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
 
             System.out.println("Attempt=" + attempt);
 
+            System.out.println("RUN ON NODE: " + ignite.cluster().localNode().id());
+
             assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode());
 
             jobCtx.setAttribute("Attempt", attempt + 1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
index db64475..bfca83d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
@@ -66,4 +66,14 @@ public class GridFailoverTestContext implements FailoverContext {
     @Override public ClusterNode getBalancedNode(List<ClusterNode> grid) {
         return grid.get(RAND.nextInt(grid.size()));
     }
+
+    /** {@inheritDoc} */
+    @Override public Object affinityKey() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String affinityCacheName() {
+        return null;
+    }
 }