You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/10 09:30:46 UTC

incubator-ignite git commit: # ignite-1087

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1087 b50067ca4 -> b2770d55d


# ignite-1087


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b2770d55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b2770d55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b2770d55

Branch: refs/heads/ignite-1087
Commit: b2770d55d33b9b836030e73869875cc834ed100a
Parents: b50067c
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 10:30:35 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 10:30:35 2015 +0300

----------------------------------------------------------------------
 .../ignite/compute/ComputeJobResultPolicy.java  |  3 +-
 .../failover/GridFailoverContextImpl.java       | 12 ++--
 .../managers/failover/GridFailoverManager.java  | 10 ++-
 .../processors/closure/AffinityTask.java        | 35 ++++++++++
 .../closure/GridClosureProcessor.java           | 71 ++++++++++++++++----
 .../task/GridTaskThreadContextKey.java          |  6 --
 .../processors/task/GridTaskWorker.java         | 20 ++++--
 .../ignite/spi/failover/FailoverContext.java    |  5 +-
 .../spi/failover/always/AlwaysFailoverSpi.java  |  4 +-
 9 files changed, 127 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
index 37aba91..26eb542 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
@@ -50,8 +50,7 @@ public enum ComputeJobResultPolicy {
      * @param ord Ordinal value.
      * @return Enumerated value.
      */
-    @Nullable
-    public static ComputeJobResultPolicy fromOrdinal(byte ord) {
+    @Nullable public static ComputeJobResultPolicy fromOrdinal(byte ord) {
         return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
index 0c1e7d8..c2b104e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.loadbalancer.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.failover.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -56,8 +57,11 @@ public class GridFailoverContextImpl implements FailoverContext {
      * @param affKey Affinity key.
      * @param affCacheName Affinity cache name.
      */
-    public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes,
-        GridLoadBalancerManager loadMgr, Object affKey, String affCacheName) {
+    public GridFailoverContextImpl(GridTaskSessionImpl taskSes,
+        ComputeJobResult jobRes,
+        GridLoadBalancerManager loadMgr,
+        @Nullable Object affKey,
+        @Nullable String affCacheName) {
         assert taskSes != null;
         assert jobRes != null;
         assert loadMgr != null;
@@ -85,12 +89,12 @@ public class GridFailoverContextImpl implements FailoverContext {
     }
 
     /** {@inheritDoc} */
-    @Override public Object affinityKey() {
+    @Nullable @Override public Object affinityKey() {
         return affKey;
     }
 
     /** {@inheritDoc} */
-    @Override public String affinityCacheName() {
+    @Nullable @Override public String affinityCacheName() {
         return affCacheName;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
index 4102514..dffc965 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
@@ -23,6 +23,7 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.*;
 import org.apache.ignite.spi.failover.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -56,13 +57,16 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
     /**
      * @param taskSes Task session.
      * @param jobRes Job result.
-     * @param top Collection of all top nodes that does not include the failed node.
+     * @param top Collection of all topology nodes.
      * @param affKey Affinity key.
      * @param affCacheName Affinity cache name.
      * @return New node to route this job to.
      */
-    public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top,
-        Object affKey, String affCacheName) {
+    public ClusterNode failover(GridTaskSessionImpl taskSes,
+        ComputeJobResult jobRes,
+        List<ClusterNode> top,
+        @Nullable Object affKey,
+        @Nullable String affCacheName) {
         return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes,
             ctx.loadBalancing(), affKey, affCacheName), top);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
new file mode 100644
index 0000000..1b32444
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Affinity mapped task.
+ */
+public interface AffinityTask {
+    /**
+     * @return Affinity key.
+     */
+    public Object affinityKey();
+
+    /**
+     * @return Affinity cache name.
+     */
+    @Nullable public String affinityCacheName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 02b14a1..21bfc11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -413,13 +413,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-            ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+            if (node == null)
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
 
-            if (cacheName != null)
-                ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T5(node, job), null, false);
+            return ctx.task().execute(new T5(node, job, affKey0, cacheName), null, false);
         }
         catch (IgniteCheckedException e) {
             return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
@@ -449,13 +448,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-            ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+            if (node == null)
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
 
-            if (cacheName != null)
-                ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T4(node, job), null, false);
+            return ctx.task().execute(new T4(node, job, affKey0, cacheName), null, false);
         }
         catch (IgniteCheckedException e) {
             return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
@@ -1231,7 +1229,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      */
-    private static class T4 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
+    private static class T4 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection, AffinityTask {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1241,15 +1239,27 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         /** */
         private Runnable job;
 
+        /** */
+        private Object affKey;
+
+        /** */
+        private String affCacheName;
+
         /**
          * @param node Cluster node.
          * @param job Job.
+         * @param affKey Affinity key.
+         * @param affCacheName Affinity cache name.
          */
-        private T4(ClusterNode node, Runnable job) {
+        private T4(ClusterNode node, Runnable job, Object affKey, String affCacheName) {
             super(U.peerDeployAware0(job));
 
+            assert affKey != null;
+
             this.node = node;
             this.job = job;
+            this.affKey = affKey;
+            this.affCacheName = affCacheName;
         }
 
         /** {@inheritDoc} */
@@ -1258,11 +1268,22 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             return Collections.singletonMap(job, node);
         }
+
+        /** {@inheritDoc} */
+        @Override public Object affinityKey() {
+            return affKey;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String affinityCacheName() {
+            return affCacheName;
+        }
     }
 
     /**
      */
-    private static class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection {
+    private static class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements
+        GridNoImplicitInjection, AffinityTask {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1272,15 +1293,27 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         /** */
         private Callable<R> job;
 
+        /** */
+        private Object affKey;
+
+        /** */
+        private String affCacheName;
+
         /**
          * @param node Cluster node.
          * @param job Job.
+         * @param affKey Affinity key.
+         * @param affCacheName Affinity cache name.
          */
-        private T5(ClusterNode node, Callable<R> job) {
+        private T5(ClusterNode node, Callable<R> job, Object affKey, String affCacheName) {
             super(U.peerDeployAware0(job));
 
+            assert affKey != null;
+
             this.node = node;
             this.job = job;
+            this.affKey = affKey;
+            this.affCacheName = affCacheName;
         }
 
         /** {@inheritDoc} */
@@ -1299,6 +1332,16 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
             throw new IgniteException("Failed to find successful job result: " + res);
         }
+
+        /** {@inheritDoc} */
+        @Override public Object affinityKey() {
+            return affKey;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String affinityCacheName() {
+            return affCacheName;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
index 0bfac62..df706cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
@@ -27,12 +27,6 @@ public enum GridTaskThreadContextKey {
     /** No failover flag. */
     TC_NO_FAILOVER,
 
-    /** Affinity key. */
-    TC_AFFINITY_KEY,
-
-    /** Affinity cache. */
-    TC_AFFINITY_CACHE,
-
     /** Projection for the task. */
     TC_SUBGRID,
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index ed9958f..ee62df8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.compute.*;
 import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -252,8 +253,16 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
         this.noFailover = noFailover != null ? noFailover : false;
 
-        this.affKey = getThreadContext(TC_AFFINITY_KEY);
-        this.affCache = getThreadContext(TC_AFFINITY_CACHE);
+        if (task instanceof AffinityTask) {
+            AffinityTask affTask = (AffinityTask)task;
+
+            affKey = affTask.affinityKey();
+            affCache = affTask.affinityCacheName();
+        }
+        else {
+            affKey = null;
+            affCache = null;
+        }
     }
 
     /**
@@ -406,7 +415,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
             ses.setClassLoader(dep.classLoader());
 
-            final List<ClusterNode> shuffledNodes = getTaskTopology();
+            // Nodes are ignored by affinity tasks.
+            final List<ClusterNode> shuffledNodes =
+                affKey == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
 
             // Load balancer.
             ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
@@ -568,9 +579,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
         Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
 
-        if (affKey != null)
-            return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey));
-
         int size = subgrid.size();
 
         if (size == 0)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index eddd9ea..865f1a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -60,7 +61,7 @@ public interface FailoverContext {
      *
      * @return Affinity key.
      */
-    public Object affinityKey();
+    @Nullable public Object affinityKey();
 
     /**
      * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
@@ -68,5 +69,5 @@ public interface FailoverContext {
      *
      * @return Cache name.
      */
-    public String affinityCacheName();
+    @Nullable public String affinityCacheName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index eeaf18e..e925995 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -178,8 +178,6 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
             return null;
         }
 
-        Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
-
         if (ctx.affinityKey() != null) {
             Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
 
@@ -200,6 +198,8 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
             }
         }
 
+        Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
+
         if (failedNodes == null)
             failedNodes = U.newHashSet(1);