You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/03 13:38:03 UTC

incubator-ignite git commit: #ignite-1087: affinity run runs several times on primary node.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1087 [created] 355e33db0


#ignite-1087: affinity run runs several times on primary node.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/355e33db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/355e33db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/355e33db

Branch: refs/heads/ignite-1087
Commit: 355e33db0b0551759a7ce976f6af53bc7ad5a36a
Parents: d8f5b6f
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 3 14:37:55 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 3 14:37:55 2015 +0300

----------------------------------------------------------------------
 .../closure/GridClosureProcessor.java           |  4 ++
 .../task/GridTaskThreadContextKey.java          |  6 +++
 .../processors/task/GridTaskWorker.java         | 16 ++++++
 .../spi/failover/always/AlwaysFailoverSpi.java  | 37 +++++++++++++
 .../cache/GridCacheAffinityRoutingSelfTest.java | 56 +++++++++++++++++++-
 5 files changed, 118 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 658557e..1eb53ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -414,6 +414,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
             final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
+            ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+            ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
 
             return ctx.task().execute(new T5(node, job), null, false);
         }
@@ -446,6 +448,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
             final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
+            ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+            ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
 
             return ctx.task().execute(new T4(node, job), null, false);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
index df706cf..0bfac62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
@@ -27,6 +27,12 @@ public enum GridTaskThreadContextKey {
     /** No failover flag. */
     TC_NO_FAILOVER,
 
+    /** Affinity key. */
+    TC_AFFINITY_KEY,
+
+    /** Affinity cache. */
+    TC_AFFINITY_CACHE,
+
     /** Projection for the task. */
     TC_SUBGRID,
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index eb5fa77..61bb62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.visor.util.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.failover.always.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -136,6 +137,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
     private final boolean noFailover;
 
     /** */
+    private final Object affKey;
+
+    /** */
+    private final String affCache;
+
+    /** */
     private final UUID subjId;
 
     /** Continuous mapper. */
@@ -245,6 +252,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
         Boolean noFailover = getThreadContext(TC_NO_FAILOVER);
 
         this.noFailover = noFailover != null ? noFailover : false;
+
+        this.affKey = getThreadContext(TC_AFFINITY_KEY);
+        this.affCache = getThreadContext(TC_AFFINITY_CACHE);
     }
 
     /**
@@ -559,6 +569,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
         Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
 
+        if (affKey != null)
+            return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey));
+
         int size = subgrid.size();
 
         if (size == 0)
@@ -769,6 +782,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         }
 
                         case FAILOVER: {
+                            if (jobRes != null && affKey != null)
+                                jobRes.getJobContext().setAttribute(AlwaysFailoverSpi.AFFINITY_CALL_FLAG, true);
+
                             if (!failover(res, jobRes, getTaskTopology()))
                                 plc = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index e075d3e..c5c0f18 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -92,6 +92,21 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
      */
     public static final String FAILED_NODE_LIST_ATTR = "gg:failover:failednodelist";
 
+    /**
+     * Name of job context attribute containing affinity call flag for affinity call.
+     *
+     * @see org.apache.ignite.compute.ComputeJobContext
+     */
+    public static final String AFFINITY_CALL_ATTEMPT = "ignite:failover:affinitycallattempt";
+
+    /**
+     * Name of job context attribute containing affinity call flag for affinity call.
+     *
+     * @see org.apache.ignite.compute.ComputeJobContext
+     */
+    public static final String AFFINITY_CALL_FLAG = "ignite:failover:affinitycall";
+
+
     /** Maximum attempts attribute key should be the same on all nodes. */
     public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts";
 
@@ -175,6 +190,28 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
 
         Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
 
+        Boolean affCall = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_FLAG);
+
+        if (affCall != null && affCall) {
+            Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
+
+            if (affCallAttempt == null)
+                affCallAttempt = 1;
+
+            if (maxFailoverAttempts <= affCallAttempt) {
+                U.warn(log, "Job failover failed because number of maximum failover attempts for affinity call" +
+                    " is exceeded [failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" +
+                    maxFailoverAttempts + ']');
+
+                return null;
+            }
+            else {
+                ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1);
+
+                return top.get(0);
+            }
+        }
+
         if (failedNodes == null)
             failedNodes = U.newHashSet(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
index 78ecf08..38de9a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
@@ -19,17 +19,20 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
+import java.util.concurrent.*;
+
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 
@@ -129,6 +132,18 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityRunRestart() throws Exception {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, "key", new FailedRunnable("key"));
+                return null;
+            }
+        }, ClusterTopologyException.class, "Failed to failover a job to another node");
+    }
+
+    /**
      * JUnit.
      *
      * @throws Exception If failed.
@@ -224,6 +239,45 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test runnable.
+     */
+    private static class FailedRunnable extends CAX {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        @JobContextResource
+        private ComputeJobContext jobCtx;
+
+        /** Key. */
+        private final Object key;
+
+        /**
+         * @param key Key.
+         */
+        public FailedRunnable(Object key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void applyx() throws IgniteCheckedException {
+            Integer attempt = jobCtx.getAttribute("Attempt");
+
+            if (attempt == null)
+                attempt = 0;
+
+            System.out.println("Attempt=" + attempt);
+
+            assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode());
+
+            jobCtx.setAttribute("Attempt", attempt + 1);
+
+            throw new ComputeJobFailoverException("Failover exception.");
+        }
+    }
+
+    /**
      * Test callable.
      */
     private static class CheckCallable implements IgniteCallable<Object> {