You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/20 17:05:32 UTC
[16/23] incubator-ignite git commit: # ignite-1087
# ignite-1087
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b2770d55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b2770d55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b2770d55
Branch: refs/heads/ignite-1087
Commit: b2770d55d33b9b836030e73869875cc834ed100a
Parents: b50067c
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 10:30:35 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 10:30:35 2015 +0300
----------------------------------------------------------------------
.../ignite/compute/ComputeJobResultPolicy.java | 3 +-
.../failover/GridFailoverContextImpl.java | 12 ++--
.../managers/failover/GridFailoverManager.java | 10 ++-
.../processors/closure/AffinityTask.java | 35 ++++++++++
.../closure/GridClosureProcessor.java | 71 ++++++++++++++++----
.../task/GridTaskThreadContextKey.java | 6 --
.../processors/task/GridTaskWorker.java | 20 ++++--
.../ignite/spi/failover/FailoverContext.java | 5 +-
.../spi/failover/always/AlwaysFailoverSpi.java | 4 +-
9 files changed, 127 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
index 37aba91..26eb542 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java
@@ -50,8 +50,7 @@ public enum ComputeJobResultPolicy {
* @param ord Ordinal value.
* @return Enumerated value.
*/
- @Nullable
- public static ComputeJobResultPolicy fromOrdinal(byte ord) {
+ @Nullable public static ComputeJobResultPolicy fromOrdinal(byte ord) {
return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
index 0c1e7d8..c2b104e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.loadbalancer.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.spi.failover.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -56,8 +57,11 @@ public class GridFailoverContextImpl implements FailoverContext {
* @param affKey Affinity key.
* @param affCacheName Affinity cache name.
*/
- public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes,
- GridLoadBalancerManager loadMgr, Object affKey, String affCacheName) {
+ public GridFailoverContextImpl(GridTaskSessionImpl taskSes,
+ ComputeJobResult jobRes,
+ GridLoadBalancerManager loadMgr,
+ @Nullable Object affKey,
+ @Nullable String affCacheName) {
assert taskSes != null;
assert jobRes != null;
assert loadMgr != null;
@@ -85,12 +89,12 @@ public class GridFailoverContextImpl implements FailoverContext {
}
/** {@inheritDoc} */
- @Override public Object affinityKey() {
+ @Nullable @Override public Object affinityKey() {
return affKey;
}
/** {@inheritDoc} */
- @Override public String affinityCacheName() {
+ @Nullable @Override public String affinityCacheName() {
return affCacheName;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
index 4102514..dffc965 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
@@ -23,6 +23,7 @@ import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.*;
import org.apache.ignite.spi.failover.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -56,13 +57,16 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
/**
* @param taskSes Task session.
* @param jobRes Job result.
- * @param top Collection of all top nodes that does not include the failed node.
+ * @param top Collection of all topology nodes.
* @param affKey Affinity key.
* @param affCacheName Affinity cache name.
* @return New node to route this job to.
*/
- public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top,
- Object affKey, String affCacheName) {
+ public ClusterNode failover(GridTaskSessionImpl taskSes,
+ ComputeJobResult jobRes,
+ List<ClusterNode> top,
+ @Nullable Object affKey,
+ @Nullable String affCacheName) {
return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes,
ctx.loadBalancing(), affKey, affCacheName), top);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
new file mode 100644
index 0000000..1b32444
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Affinity mapped task.
+ */
+public interface AffinityTask {
+ /**
+ * @return Affinity key.
+ */
+ public Object affinityKey();
+
+ /**
+ * @return Affinity cache name.
+ */
+ @Nullable public String affinityCacheName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 02b14a1..21bfc11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -413,13 +413,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
- ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+ if (node == null)
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
- if (cacheName != null)
- ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
+ ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T5(node, job), null, false);
+ return ctx.task().execute(new T5(node, job, affKey0, cacheName), null, false);
}
catch (IgniteCheckedException e) {
return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
@@ -449,13 +448,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
- ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0);
+ if (node == null)
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
- if (cacheName != null)
- ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName);
+ ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T4(node, job), null, false);
+ return ctx.task().execute(new T4(node, job, affKey0, cacheName), null, false);
}
catch (IgniteCheckedException e) {
return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
@@ -1231,7 +1229,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*/
- private static class T4 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
+ private static class T4 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection, AffinityTask {
/** */
private static final long serialVersionUID = 0L;
@@ -1241,15 +1239,27 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/** */
private Runnable job;
+ /** */
+ private Object affKey;
+
+ /** */
+ private String affCacheName;
+
/**
* @param node Cluster node.
* @param job Job.
+ * @param affKey Affinity key.
+ * @param affCacheName Affinity cache name.
*/
- private T4(ClusterNode node, Runnable job) {
+ private T4(ClusterNode node, Runnable job, Object affKey, String affCacheName) {
super(U.peerDeployAware0(job));
+ assert affKey != null;
+
this.node = node;
this.job = job;
+ this.affKey = affKey;
+ this.affCacheName = affCacheName;
}
/** {@inheritDoc} */
@@ -1258,11 +1268,22 @@ public class GridClosureProcessor extends GridProcessorAdapter {
return Collections.singletonMap(job, node);
}
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey() {
+ return affKey;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String affinityCacheName() {
+ return affCacheName;
+ }
}
/**
*/
- private static class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection {
+ private static class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements
+ GridNoImplicitInjection, AffinityTask {
/** */
private static final long serialVersionUID = 0L;
@@ -1272,15 +1293,27 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/** */
private Callable<R> job;
+ /** */
+ private Object affKey;
+
+ /** */
+ private String affCacheName;
+
/**
* @param node Cluster node.
* @param job Job.
+ * @param affKey Affinity key.
+ * @param affCacheName Affinity cache name.
*/
- private T5(ClusterNode node, Callable<R> job) {
+ private T5(ClusterNode node, Callable<R> job, Object affKey, String affCacheName) {
super(U.peerDeployAware0(job));
+ assert affKey != null;
+
this.node = node;
this.job = job;
+ this.affKey = affKey;
+ this.affCacheName = affCacheName;
}
/** {@inheritDoc} */
@@ -1299,6 +1332,16 @@ public class GridClosureProcessor extends GridProcessorAdapter {
throw new IgniteException("Failed to find successful job result: " + res);
}
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey() {
+ return affKey;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String affinityCacheName() {
+ return affCacheName;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
index 0bfac62..df706cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
@@ -27,12 +27,6 @@ public enum GridTaskThreadContextKey {
/** No failover flag. */
TC_NO_FAILOVER,
- /** Affinity key. */
- TC_AFFINITY_KEY,
-
- /** Affinity cache. */
- TC_AFFINITY_CACHE,
-
/** Projection for the task. */
TC_SUBGRID,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index ed9958f..ee62df8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.compute.*;
import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.closure.*;
import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -252,8 +253,16 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
this.noFailover = noFailover != null ? noFailover : false;
- this.affKey = getThreadContext(TC_AFFINITY_KEY);
- this.affCache = getThreadContext(TC_AFFINITY_CACHE);
+ if (task instanceof AffinityTask) {
+ AffinityTask affTask = (AffinityTask)task;
+
+ affKey = affTask.affinityKey();
+ affCache = affTask.affinityCacheName();
+ }
+ else {
+ affKey = null;
+ affCache = null;
+ }
}
/**
@@ -406,7 +415,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ses.setClassLoader(dep.classLoader());
- final List<ClusterNode> shuffledNodes = getTaskTopology();
+ // Nodes are ignored by affinity tasks.
+ final List<ClusterNode> shuffledNodes =
+ affKey == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
// Load balancer.
ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
@@ -568,9 +579,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
- if (affKey != null)
- return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey));
-
int size = subgrid.size();
if (size == 0)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index eddd9ea..865f1a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -60,7 +61,7 @@ public interface FailoverContext {
*
* @return Affinity key.
*/
- public Object affinityKey();
+ @Nullable public Object affinityKey();
/**
* Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
@@ -68,5 +69,5 @@ public interface FailoverContext {
*
* @return Cache name.
*/
- public String affinityCacheName();
+ @Nullable public String affinityCacheName();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index eeaf18e..e925995 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -178,8 +178,6 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
return null;
}
- Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
-
if (ctx.affinityKey() != null) {
Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
@@ -200,6 +198,8 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
}
}
+ Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR);
+
if (failedNodes == null)
failedNodes = U.newHashSet(1);