You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/09 14:50:38 UTC
incubator-ignite git commit: #ignite-1087: add test.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1087 c9f9460a5 -> d52186b86
#ignite-1087: add test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d52186b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d52186b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d52186b8
Branch: refs/heads/ignite-1087
Commit: d52186b8663c2e8d30aa06ad26aefad5be9a16a9
Parents: c9f9460
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jul 9 15:50:31 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jul 9 15:50:31 2015 +0300
----------------------------------------------------------------------
.../processors/task/GridTaskWorker.java | 3 +
.../cache/GridCacheAffinityRoutingSelfTest.java | 74 ++++++++++++++++----
2 files changed, 63 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d52186b8/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index c725a42..ed9958f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -568,6 +568,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
+ if (affKey != null)
+ return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey));
+
int size = subgrid.size();
if (size == 0)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d52186b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
index 80e558b..bb99406 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java
@@ -22,12 +22,15 @@ import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.failover.always.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
@@ -50,6 +53,9 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
private static final int KEY_CNT = 50;
/** */
+ private static final int MAX_FAILOVER_ATTEMPTS = 5;
+
+ /** */
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/**
@@ -69,6 +75,10 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(spi);
+ AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
+ failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
+ cfg.setFailoverSpi(failSpi);
+
if (!gridName.equals(getTestGridName(GRID_CNT))) {
// Default cache configuration.
CacheConfiguration dfltCacheCfg = defaultCacheConfiguration();
@@ -134,16 +144,45 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testAffinityRunRestart() throws Exception {
+ public void testAffinityCallRestartFails() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, "key", new FailedRunnable("key"));
+ grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, "key",
+ new FailedCallable("key", MAX_FAILOVER_ATTEMPTS + 1));
return null;
}
}, ClusterTopologyException.class, "Failed to failover a job to another node");
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAffinityCallRestart() throws Exception {
+ assertEquals(MAX_FAILOVER_ATTEMPTS,
+ grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, "key",
+ new FailedCallable("key", MAX_FAILOVER_ATTEMPTS)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityCallRestartNode() throws Exception {
+ Integer key = primaryKey(grid(0).cache(NON_DFLT_CACHE_NAME));
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.sleep(500);
+ stopGrid(0);
+
+ return null;
+ }
+ });
+
+ while (!fut.isDone())
+ grid(1).compute().affinityCall(NON_DFLT_CACHE_NAME, key, new CheckCallable(key, key));
+ }
+
+ /**
* JUnit.
*
* @throws Exception If failed.
@@ -241,7 +280,10 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
/**
* Test runnable.
*/
- private static class FailedRunnable extends CAX {
+ private static class FailedCallable implements IgniteCallable<Object> {
+ /** */
+ private static final String ATTR_ATTEMPT = "Attempt";
+
/** */
@IgniteInstanceResource
private Ignite ignite;
@@ -253,29 +295,33 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
/** Key. */
private final Object key;
+ /** Call attempts. */
+ private final Integer callAttempt;
+
/**
* @param key Key.
+ * @param callAttempt Call attempts.
*/
- public FailedRunnable(Object key) {
+ public FailedCallable(Object key, Integer callAttempt) {
this.key = key;
+ this.callAttempt = callAttempt;
}
/** {@inheritDoc} */
- @Override public void applyx() throws IgniteCheckedException {
- Integer attempt = jobCtx.getAttribute("Attempt");
+ @Override public Object call() throws IgniteCheckedException {
+ Integer attempt = jobCtx.getAttribute(ATTR_ATTEMPT);
if (attempt == null)
- attempt = 0;
-
- System.out.println("Attempt=" + attempt);
-
- System.out.println("RUN ON NODE: " + ignite.cluster().localNode().id());
+ attempt = 1;
assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode());
- jobCtx.setAttribute("Attempt", attempt + 1);
+ jobCtx.setAttribute(ATTR_ATTEMPT, attempt + 1);
- throw new ComputeJobFailoverException("Failover exception.");
+ if (attempt < callAttempt)
+ throw new ComputeJobFailoverException("Failover exception.");
+ else
+ return attempt;
}
}
@@ -310,7 +356,7 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest {
@Override public Object call() throws IgniteCheckedException {
assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, affKey).id());
assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, key).id());
-
+ System.out.println("CALL ON NODE=" + ignite.name());
return null;
}
}