You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/19 11:48:34 UTC
[17/53] [abbrv] ignite git commit: IGNITE-2310 Lock cache partition
for affinityRun/affinityCall execution
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index a42eb98..8469a7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryObjectEx;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -145,6 +146,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** */
private final GridQueryIndexing idx;
+ /** */
+ private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
+
/**
* @param ctx Kernal context.
*/
@@ -878,7 +882,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
sqlQry,
F.asList(params),
typeDesc,
- idx.backupFilter(null, null, null));
+ idx.backupFilter(null, requestTopVer.get(), null));
sendQueryExecutedEvent(
sqlQry,
@@ -964,7 +968,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
Object[] args = qry.getArgs();
final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args),
- idx.backupFilter(null, null, null));
+ idx.backupFilter(null, requestTopVer.get(), null));
sendQueryExecutedEvent(sql, args);
@@ -1815,6 +1819,20 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param ver Version.
+ */
+ public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {
+ requestTopVer.set(ver);
+ }
+
+ /**
+ * @return Affinity topology version of the current request.
+ */
+ public static AffinityTopologyVersion getRequestAffinityTopologyVersion() {
+ return requestTopVer.get();
+ }
+
+ /**
* Description of type property.
*/
private static class ClassProperty extends GridQueryProperty {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 415d632..00ea29e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -110,6 +111,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
/** Split size threshold. */
private static final int SPLIT_WARN_THRESHOLD = 1000;
+ /** Retry delay factor (ms). Retry delay = retryAttempt * RETRY_DELAY_MS */
+ private static final long RETRY_DELAY_MS = 10;
+
/** {@code True} for internal tasks. */
private boolean internal;
@@ -192,7 +196,19 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
private final Object affKey;
/** */
- private final String affCache;
+ private final int affPartId;
+
+ /** */
+ private final String affCacheName;
+
+ /** */
+ private final int[] affCacheIds;
+
+ /** */
+ private AffinityTopologyVersion mapTopVer;
+
+ /** */
+ private int retryAttemptCnt;
/** */
private final UUID subjId;
@@ -308,12 +324,27 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
if (task instanceof AffinityTask) {
AffinityTask affTask = (AffinityTask)task;
+ assert affTask.affinityCacheNames() != null : affTask;
+ assert affTask.partition() >= 0 : affTask;
+
+ affPartId = affTask.partition();
+ affCacheName = F.first(affTask.affinityCacheNames());
affKey = affTask.affinityKey();
- affCache = affTask.affinityCacheName();
+ mapTopVer = affTask.topologyVersion();
+
+ affCacheIds = new int[affTask.affinityCacheNames().size()];
+ int i = 0;
+ for (String cacheName : affTask.affinityCacheNames()) {
+ affCacheIds[i] = CU.cacheId(cacheName);
+ ++i;
+ }
}
else {
+ affPartId = -1;
+ affCacheName = null;
affKey = null;
- affCache = null;
+ mapTopVer = null;
+ affCacheIds = null;
}
}
@@ -469,7 +500,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
// Nodes are ignored by affinity tasks.
final List<ClusterNode> shuffledNodes =
- affKey == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
+ affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
// Load balancer.
ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
@@ -818,6 +849,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
return;
}
+ boolean retry = false;
synchronized (mux) {
// If task is not waiting for responses,
// then there is no point to proceed.
@@ -829,54 +861,76 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
return;
}
- switch (plc) {
- // Start reducing all results received so far.
- case REDUCE: {
- state = State.REDUCING;
+ if (res.retry()) {
+ // Retry is used only with affinity call / run.
+ assert affCacheIds != null;
+ retry = true;
- break;
- }
+ mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.discovery().topologyVersionEx());
+ affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
- // Keep waiting if there are more responses to come,
- // otherwise, reduce.
- case WAIT: {
- assert results.size() <= this.jobRes.size();
+ if (affFut != null && !affFut.isDone()) {
+ waitForAffTop = true;
- // If there are more results to wait for.
- // If result cache is disabled, then we reduce
- // when both collections are empty.
- if (results.size() == this.jobRes.size()) {
- plc = ComputeJobResultPolicy.REDUCE;
-
- // All results are received, proceed to reduce method.
+ jobRes.resetResponse();
+ }
+ } else {
+ switch (plc) {
+ // Start reducing all results received so far.
+ case REDUCE: {
state = State.REDUCING;
+
+ break;
}
- break;
- }
+ // Keep waiting if there are more responses to come,
+ // otherwise, reduce.
+ case WAIT: {
+ assert results.size() <= this.jobRes.size();
+
+ // If there are more results to wait for.
+ // If result cache is disabled, then we reduce
+ // when both collections are empty.
+ if (results.size() == this.jobRes.size()) {
+ plc = ComputeJobResultPolicy.REDUCE;
- case FAILOVER: {
- if (affKey != null) {
- AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+ // All results are received, proceed to reduce method.
+ state = State.REDUCING;
+ }
- affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+ break;
}
- if (affFut != null && !affFut.isDone()) {
- waitForAffTop = true;
+ case FAILOVER: {
+ if (affCacheIds != null) {
+ mapTopVer = ctx.discovery().topologyVersionEx();
- jobRes.resetResponse();
- }
- else if (!failover(res, jobRes, getTaskTopology()))
- plc = null;
+ affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
+ }
+
+ if (affFut != null && !affFut.isDone()) {
+ waitForAffTop = true;
- break;
+ jobRes.resetResponse();
+ }
+ else if (!failover(res, jobRes, getTaskTopology()))
+ plc = null;
+
+ break;
+ }
}
}
}
// Outside of synchronization.
- if (plc != null && !waitForAffTop) {
+ if (retry && !waitForAffTop) {
+ // Handle retry
+ retryAttemptCnt++;
+
+ final long wait = retryAttemptCnt * RETRY_DELAY_MS;
+ sendRetryRequest(wait, jobRes, res);
+ }
+ else if (plc != null && !waitForAffTop && !retry) {
// Handle failover.
if (plc == FAILOVER)
sendFailoverRequest(jobRes);
@@ -928,6 +982,36 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
/**
+ * @param waitms Waitms.
+ * @param jRes Job result.
+ * @param resp Job responce.
+ */
+ private void sendRetryRequest(final long waitms, final GridJobResultImpl jRes, final GridJobExecuteResponse resp) {
+ ctx.timeout().schedule(new Runnable() {
+ @Override public void run() {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ ClusterNode newNode = ctx.affinity().mapPartitionToNode(affCacheName, affPartId,
+ mapTopVer);
+
+ if(!checkTargetNode(resp, jRes, newNode))
+ return;
+
+ sendRequest(jRes);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to re-map job or retry request [ses=" + ses + "]", e);
+
+ finishTask(null, e);
+ }
+ }
+ }, false);
+ }
+ }, waitms, -1);
+ }
+
+ /**
* @param jobRes Job result.
* @param results Existing job results.
* @return Job result policy.
@@ -1083,53 +1167,63 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
try {
ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
- // Map to a new node.
- ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
+ ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affPartId,
+ affKey, affCacheName, mapTopVer);
- if (node == null) {
- String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
- jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
+ return checkTargetNode(res, jobRes, node);
+ }
+ // Catch Throwable to protect against bad user code.
+ catch (Throwable e) {
+ String errMsg = "Failed to failover job due to undeclared user exception [job=" +
+ jobRes.getJob() + ", err=" + e + ']';
- if (log.isDebugEnabled())
- log.debug(msg);
+ U.error(log, errMsg, e);
- Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+ finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
- finishTask(null, e);
+ if (e instanceof Error)
+ throw (Error)e;
- return false;
- }
+ return false;
+ }
+ }
+
+ /**
+ * @param res Execution response.
+ * @param jobRes Job result.
+ * @param node New target node.
+ * @return {@code True} if new target node is not null.
+ */
+ private boolean checkTargetNode(GridJobExecuteResponse res, GridJobResultImpl jobRes, ClusterNode node) {
+ if (node == null) {
+ String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
+ jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
if (log.isDebugEnabled())
- log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
- ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+ log.debug(msg);
+
+ Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+
+ finishTask(null, e);
+
+ return false;
+ }
+ if (log.isDebugEnabled())
+ log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
+ ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+
+ synchronized (mux) {
jobRes.setNode(node);
jobRes.resetResponse();
if (!resCache) {
- synchronized (mux) {
// Store result back in map before sending.
this.jobRes.put(res.getJobId(), jobRes);
- }
}
-
- return true;
}
- // Catch Throwable to protect against bad user code.
- catch (Throwable e) {
- String errMsg = "Failed to failover job due to undeclared user exception [job=" +
- jobRes.getJob() + ", err=" + e + ']';
- U.error(log, errMsg, e);
-
- finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
-
- if (e instanceof Error)
- throw (Error)e;
-
- return false;
- }
+ return true;
}
/**
@@ -1227,7 +1321,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
- res.getJobContext().getJobId(), null, null, null, null, null, null, false);
+ res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node));
@@ -1272,7 +1366,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
forceLocDep,
ses.isFullSupport(),
internal,
- subjId);
+ subjId,
+ affCacheIds,
+ affPartId,
+ mapTopVer);
if (loc)
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
@@ -1319,7 +1416,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
- res.getJobContext().getJobId(), null, null, null, null, null, null, false);
+ res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
if (fakeErr == null)
fakeErr = U.convertException(e);
@@ -1351,7 +1448,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
// Artificial response in case if a job is waiting for a response from
// non-existent node.
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(nodeId, ses.getId(),
- jr.getJobContext().getJobId(), null, null, null, null, null, null, false);
+ jr.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
fakeRes.setFakeException(new ClusterTopologyException("Node has left grid: " + nodeId));
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 269795b..a480b87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9602,4 +9602,14 @@ public abstract class IgniteUtils {
return "<failed to find active thread " + threadId + '>';
}
+
+ /**
+ * @param t0 Comparable object.
+ * @param t1 Comparable object.
+ * @param <T> Comparable type.
+ * @return Maximal object o t0 and t1.
+ */
+ public static <T extends Comparable<? super T>> T max(T t0, T t1) {
+ return t0.compareTo(t1) > 0 ? t0 : t1;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index 1108ad1..b126db1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.failover;
+import java.util.Collection;
import java.util.List;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
@@ -58,14 +59,24 @@ public interface FailoverContext {
public ClusterNode getBalancedNode(List<ClusterNode> top) throws IgniteException;
/**
- * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
- * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
+ * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)},
+ * {@link IgniteCompute#affinityRun(Collection, Object, IgniteRunnable)},
+ * {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}
+ * and {@link IgniteCompute#affinityCall(Collection, Object, IgniteCallable)}.
*
* @return Affinity key.
*/
@Nullable public Object affinityKey();
/**
+ * Gets partition for {@link IgniteCompute#affinityRun(Collection, int, IgniteRunnable)}
+ * and {@link IgniteCompute#affinityCall(Collection, int, IgniteCallable)}.
+ *
+ * @return Partition number.
+ */
+ public int partition();
+
+ /**
* Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
* and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index 77b3745..63c990e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -23,9 +23,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.failover.GridFailoverContextImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -189,7 +192,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
return null;
}
- if (ctx.affinityKey() != null) {
+ if (ctx.partition() >= 0) {
Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
if (affCallAttempt == null)
@@ -205,7 +208,15 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
else {
ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1);
- return ignite.affinity(ctx.affinityCacheName()).mapKeyToNode(ctx.affinityKey());
+ try {
+ return ((IgniteEx)ignite).context().affinity().mapPartitionToNode(ctx.affinityCacheName(), ctx.partition(),
+ ((GridFailoverContextImpl)ctx).affinityTopologyVersion());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get map job to node on failover: " + ctx, e);
+
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
index a7cab3f..a484ec3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
@@ -464,7 +464,7 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
ClusterNode node = F.first(prj.nodes());
- comp.affinityRun(null, keyForNode(aff, node), new TestRunnable());
+ comp.affinityRun((String)null, keyForNode(aff, node), new TestRunnable());
return comp.future();
}
@@ -483,7 +483,7 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
ClusterNode node = F.first(prj.nodes());
- comp.affinityCall(null, keyForNode(aff, node), new TestCallable());
+ comp.affinityCall((String)null, keyForNode(aff, node), new TestCallable());
return comp.future();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
index 356e002..fc94663 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
@@ -88,7 +88,7 @@ public class GridProjectionLocalJobMultipleArgumentsSelfTest extends GridCommonA
Collection<Integer> res = new ArrayList<>();
for (int i : F.asList(1, 2, 3)) {
- res.add(grid().compute().affinityCall(null, i, new IgniteCallable<Integer>() {
+ res.add(grid().compute().affinityCall((String)null, i, new IgniteCallable<Integer>() {
@Override public Integer call() {
ids.add(this);
@@ -106,7 +106,7 @@ public class GridProjectionLocalJobMultipleArgumentsSelfTest extends GridCommonA
*/
public void testAffinityRun() throws Exception {
for (int i : F.asList(1, 2, 3)) {
- grid().compute().affinityRun(null, i, new IgniteRunnable() {
+ grid().compute().affinityRun((String)null, i, new IgniteRunnable() {
@Override public void run() {
ids.add(this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
index f804cb3..7997560 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
@@ -137,7 +137,7 @@ public class GridTaskFailoverAffinityRunTest extends GridCommonAbstractTest {
Collection<IgniteFuture<?>> futs = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++) {
- comp.affinityCall(null, i, new TestJob());
+ comp.affinityCall((String)null, i, new TestJob());
IgniteFuture<?> fut0 = comp.future();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
index 48039a5..b595fee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
@@ -82,7 +82,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
IgniteCompute comp = ignite(0).compute(empty).withAsync();
- comp.affinityRun(null, 1, new FailRunnable());
+ comp.affinityRun((String)null, 1, new FailRunnable());
checkFutureFails(comp);
@@ -90,7 +90,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
checkFutureFails(comp);
- comp.affinityCall(null, 1, new FailCallable());
+ comp.affinityCall((String)null, 1, new FailCallable());
checkFutureFails(comp);
@@ -112,7 +112,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
- comp.affinityRun(null, 1, new FailRunnable());
+ comp.affinityRun((String)null, 1, new FailRunnable());
return null;
}
@@ -129,7 +129,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
- comp.affinityCall(null, 1, new FailCallable());
+ comp.affinityCall((String)null, 1, new FailCallable());
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
index 2b54f6b..d5f084d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
@@ -178,7 +178,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
for (int i = 0; i < 1000; i++) {
nodeId.set(null);
- grid(0).compute().affinityRun(null, new TestObject(i), new IgniteRunnable() {
+ grid(0).compute().affinityRun((String)null, new TestObject(i), new IgniteRunnable() {
@IgniteInstanceResource
private Ignite ignite;
@@ -189,7 +189,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
assertEquals(aff.mapKeyToNode(i).id(), nodeId.get());
- grid(0).compute().affinityRun(null, new AffinityKey(0, i), new IgniteRunnable() {
+ grid(0).compute().affinityRun((String)null, new AffinityKey(0, i), new IgniteRunnable() {
@IgniteInstanceResource
private Ignite ignite;
@@ -211,7 +211,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
for (int i = 0; i < 1000; i++) {
nodeId.set(null);
- grid(0).compute().affinityCall(null, new TestObject(i), new IgniteCallable<Object>() {
+ grid(0).compute().affinityCall((String)null, new TestObject(i), new IgniteCallable<Object>() {
@IgniteInstanceResource
private Ignite ignite;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
index c84a2d0..706d8aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
@@ -121,7 +121,7 @@ public abstract class GridCacheAbstractUsersAffinityMapperSelfTest extends GridC
startGrid(1);
for (int i = 0; i < KEY_CNT; i++)
- grid(i % 2).compute().affinityRun(null, new TestAffinityKey(1, "1"), new NoopClosure());
+ grid(i % 2).compute().affinityRun((String)null, new TestAffinityKey(1, "1"), new NoopClosure());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
index 084be02..f953c47 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
@@ -88,7 +88,7 @@ public class IgniteDynamicCacheStartStopConcurrentTest extends GridCommonAbstrac
checkTopologyVersion(new AffinityTopologyVersion(NODES, minorVer));
- ignite(0).compute().affinityRun(null, 1, new IgniteRunnable() {
+ ignite(0).compute().affinityRun((String)null, 1, new IgniteRunnable() {
@Override public void run() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
index a8a2edf..97a3e0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
@@ -22,6 +22,7 @@ import java.util.Random;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskSession;
+import org.jetbrains.annotations.Nullable;
/**
* Failover test context.
@@ -74,6 +75,11 @@ public class GridFailoverTestContext implements FailoverContext {
}
/** {@inheritDoc} */
+ @Nullable @Override public int partition() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
@Override public String affinityCacheName() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
new file mode 100644
index 0000000..28d297d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
@@ -0,0 +1,412 @@
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ */
+public class IgniteCacheLockPartitionOnAffinityRunAbstractTest extends GridCacheAbstractSelfTest {
+ /** Count of affinity run threads. */
+ protected static final int AFFINITY_THREADS_CNT = 10;
+
+ /** Count of collocated objects. */
+ protected static final int PERS_AT_ORG_CNT = 10_000;
+
+ /** Name of the cache with special affinity function (all partition are placed on the first node). */
+ protected static final String OTHER_CACHE_NAME = "otherCache";
+
+ /** Grid count. */
+ protected static final int GRID_CNT = 4;
+
+ /** Count of restarted nodes. */
+ protected static final int RESTARTED_NODE_CNT = 2;
+
+ /** Count of objects. */
+ protected static final int ORGS_COUNT_PER_NODE = 2;
+
+ /** Test duration. */
+ protected static final long TEST_DURATION = 5 * 60_000;
+
+ /** Test timeout. */
+ protected static final long TEST_TIMEOUT = TEST_DURATION + 2 * 60_000;
+
+ /** Timeout between restart of a node. */
+ protected static final long RESTART_TIMEOUT = 3_000;
+
+ /** Max failover attempts. */
+ protected static final int MAX_FAILOVER_ATTEMPTS = 100;
+
+ /** Organization ids. */
+ protected static List<Integer> orgIds;
+
+ /** Test end time. */
+ protected static long endTime;
+
+ /** Node restart thread future. */
+ protected static IgniteInternalFuture<?> nodeRestartFut;
+
+ /** Stop a test flag . */
+ protected final AtomicBoolean stopRestartThread = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIMEOUT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
+ failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
+ cfg.setFailoverSpi(failSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Class<?>[] indexedTypes() {
+ return new Class<?>[] {
+ Integer.class, Organization.class,
+ Person.Key.class, Person.class,
+ Integer.class, Integer.class
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return GRID_CNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ info("Fill caches begin...");
+
+ fillCaches();
+
+ info("Caches are filled");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ grid(0).destroyCache(Organization.class.getSimpleName());
+ grid(0).destroyCache(Person.class.getSimpleName());
+ grid(0).destroyCache(OTHER_CACHE_NAME);
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopRestartThread.set(true);
+ if (nodeRestartFut != null) {
+ nodeRestartFut.get();
+ nodeRestartFut = null;
+ }
+
+ Thread.sleep(3_000);
+
+ awaitPartitionMapExchange();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ endTime = System.currentTimeMillis() + TEST_DURATION;
+
+ super.beforeTest();
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ private void createCacheWithAffinity(String cacheName) throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(grid(0).name());
+ ccfg.setName(cacheName);
+
+ ccfg.setAffinity(new DummyAffinity());
+
+ grid(0).createCache(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void fillCaches() throws Exception {
+ grid(0).createCache(Organization.class.getSimpleName());
+ grid(0).createCache(Person.class.getSimpleName());
+
+ createCacheWithAffinity(OTHER_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ orgIds = new ArrayList<>(ORGS_COUNT_PER_NODE * RESTARTED_NODE_CNT);
+
+ for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i)
+ orgIds.addAll(primaryKeys(grid(i).cache(Organization.class.getSimpleName()), ORGS_COUNT_PER_NODE));
+
+ try (
+ IgniteDataStreamer<Integer, Organization> orgStreamer =
+ grid(0).dataStreamer(Organization.class.getSimpleName());
+ IgniteDataStreamer<Person.Key, Person> persStreamer =
+ grid(0).dataStreamer(Person.class.getSimpleName())) {
+
+ int persId = 0;
+ for (int orgId : orgIds) {
+ Organization org = new Organization(orgId);
+ orgStreamer.addData(orgId, org);
+
+ for (int persCnt = 0; persCnt < PERS_AT_ORG_CNT; ++persCnt, ++persId) {
+ Person pers = new Person(persId, orgId);
+ persStreamer.addData(pers.createKey(), pers);
+ }
+ }
+ }
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ *
+ */
+ protected void beginNodesRestart() {
+ stopRestartThread.set(false);
+ nodeRestartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int restartGrid = GRID_CNT - RESTARTED_NODE_CNT;
+ while (!stopRestartThread.get() && System.currentTimeMillis() < endTime) {
+ log.info("Restart grid: " + restartGrid);
+ stopGrid(restartGrid);
+ Thread.sleep(500);
+ startGrid(restartGrid);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !stopRestartThread.get();
+ }
+ }, RESTART_TIMEOUT);
+
+ restartGrid++;
+ if (restartGrid >= GRID_CNT)
+ restartGrid = GRID_CNT - RESTARTED_NODE_CNT;
+ awaitPartitionMapExchange();
+ }
+ return null;
+ }
+ }, "restart-node");
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param orgId Org id.
+ * @param expReservations Expected reservations.
+ * @throws Exception If failed.
+ */
+ protected static void checkPartitionsReservations(final IgniteEx ignite, int orgId,
+ final int expReservations) throws Exception {
+ int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
+
+ final GridDhtLocalPartition pPers = ignite.context().cache()
+ .internalCache(Person.class.getSimpleName()).context().topology()
+ .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+ assertNotNull(pPers);
+
+ final GridDhtLocalPartition pOrgs = ignite.context().cache()
+ .internalCache(Organization.class.getSimpleName()).context().topology()
+ .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+ assertNotNull(pOrgs);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return expReservations == pOrgs.reservations() && expReservations == pPers.reservations();
+ }
+ }, 1000L);
+ assertEquals("Unexpected reservations count", expReservations, pOrgs.reservations());
+ assertEquals("Unexpected reservations count", expReservations, pPers.reservations());
+ }
+
+ /** */
+ private static class DummyAffinity extends RendezvousAffinityFunction {
+ /**
+ * Default constructor.
+ */
+ public DummyAffinity() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ List<List<ClusterNode>> assign = new ArrayList<>(partitions());
+
+ for (int i = 0; i < partitions(); ++i)
+ assign.add(Collections.singletonList(nodes.get(0)));
+
+ return assign;
+ }
+ }
+
+
+ /**
+ * Test class Organization.
+ */
+ public static class Organization implements Serializable {
+ /** */
+ @QuerySqlField(index = true)
+ private final int id;
+
+ /**
+ * @param id ID.
+ */
+ Organization(int id) {
+ this.id = id;
+ }
+
+ /**
+ * @return id.
+ */
+ int getId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Organization.class, this);
+ }
+ }
+
+ /**
+ * Test class Organization.
+ */
+ public static class Person implements Serializable {
+ /** */
+ @QuerySqlField
+ private final int id;
+
+ /** */
+ @QuerySqlField(index = true)
+ private final int orgId;
+
+ /**
+ * @param id ID.
+ * @param orgId Organization ID.
+ */
+ Person(int id, int orgId) {
+ this.id = id;
+ this.orgId = orgId;
+ }
+
+ /**
+ * @return id.
+ */
+ int getId() {
+ return id;
+ }
+
+ /**
+ * @return organization id.
+ */
+ int getOrgId() {
+ return orgId;
+ }
+
+ /**
+ * @return Affinity key.
+ */
+ public Person.Key createKey() {
+ return new Person.Key(id, orgId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+
+ /**
+ *
+ */
+ static class Key implements Serializable {
+ /** Id. */
+ private final int id;
+
+ /** Org id. */
+ @AffinityKeyMapped
+ protected final int orgId;
+
+ /**
+ * @param id Id.
+ * @param orgId Org id.
+ */
+ private Key(int id, int orgId) {
+ this.id = id;
+ this.orgId = orgId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Person.Key key = (Person.Key)o;
+
+ return id == key.id && orgId == key.orgId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = id;
+ res = 31 * res + orgId;
+ return res;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
new file mode 100644
index 0000000..fb90c7e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310
+ */
+public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends IgniteCacheLockPartitionOnAffinityRunAbstractTest {
+ /** Atomic cache. */
+ private static final String ATOMIC_CACHE = "atomic";
+ /** Transact cache. */
+ private static final String TRANSACT_CACHE = "transact";
+ /** Transact cache. */
+ private static final long TEST_TIMEOUT = 10 * 60_000;
+ /** Keys count. */
+ private static int KEYS_CNT = 100;
+ /** Keys count. */
+ private static int PARTS_CNT = 16;
+ /** Key. */
+ private static AtomicInteger key = new AtomicInteger(0);
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIMEOUT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beginNodesRestart() {
+ stopRestartThread.set(false);
+ nodeRestartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!stopRestartThread.get() && System.currentTimeMillis() < endTime) {
+ log.info("Restart nodes");
+ for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i)
+ stopGrid(i);
+ Thread.sleep(500);
+ for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i)
+ startGrid(i);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !stopRestartThread.get();
+ }
+ }, RESTART_TIMEOUT);
+ }
+ return null;
+ }
+ }, "restart-node");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+ ccfg.setBackups(0);
+
+ return ccfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param mode Atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void createCache(String cacheName, CacheAtomicityMode mode) throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(grid(0).name());
+ ccfg.setName(cacheName);
+
+ ccfg.setAtomicityMode(mode);
+
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+
+ grid(0).createCache(ccfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ key.set(0);
+ createCache(ATOMIC_CACHE, CacheAtomicityMode.ATOMIC);
+ createCache(TRANSACT_CACHE, CacheAtomicityMode.TRANSACTIONAL);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ grid(0).destroyCache(ATOMIC_CACHE);
+ grid(0).destroyCache(TRANSACT_CACHE);
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNotReservedAtomicCacheOp() throws Exception {
+ notReservedCacheOp(ATOMIC_CACHE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNotReservedTxCacheOp() throws Exception {
+ notReservedCacheOp(TRANSACT_CACHE);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ private void notReservedCacheOp(final String cacheName) throws Exception {
+ // Workaround for initial update job metadata.
+ grid(0).compute().affinityRun(
+ Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+ new Integer(orgIds.get(0)),
+ new NotReservedCacheOpAffinityRun(0, 0, cacheName));
+
+ // Run restart threads: start re-balancing
+ beginNodesRestart();
+
+ grid(0).cache(cacheName).clear();
+
+ IgniteInternalFuture<Long> affFut = null;
+ try {
+ affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < PARTS_CNT; ++i) {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(i),
+ new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName));
+ }
+ }
+ }, AFFINITY_THREADS_CNT, "affinity-run");
+ }
+ finally {
+ if (affFut != null)
+ affFut.get();
+
+ stopRestartThread.set(true);
+ nodeRestartFut.get();
+
+ Thread.sleep(5000);
+
+ log.info("Final await. Timed out if failed");
+ awaitPartitionMapExchange();
+
+ IgniteCache cache = grid(0).cache(cacheName);
+ cache.clear();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReservedPartitionCacheOp() throws Exception {
+ // Workaround for initial update job metadata.
+ grid(0).cache(Person.class.getSimpleName()).clear();
+ grid(0).compute().affinityRun(
+ Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+ 0,
+ new ReservedPartitionCacheOpAffinityRun(0, 0));
+
+ // Run restart threads: start re-balancing
+ beginNodesRestart();
+
+ IgniteInternalFuture<Long> affFut = null;
+ try {
+ affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < PARTS_CNT; ++i) {
+ if (System.currentTimeMillis() >= endTime)
+ break;
+
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(i),
+ new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT));
+ }
+ }
+ }, AFFINITY_THREADS_CNT, "affinity-run");
+ }
+ finally {
+ if (affFut != null)
+ affFut.get();
+
+ stopRestartThread.set(true);
+ nodeRestartFut.get();
+
+ Thread.sleep(5000);
+
+ log.info("Final await. Timed out if failed");
+ awaitPartitionMapExchange();
+
+ IgniteCache cache = grid(0).cache(Person.class.getSimpleName());
+ cache.clear();
+ }
+ }
+
+ /** */
+ private static class NotReservedCacheOpAffinityRun implements IgniteRunnable {
+ /** Org id. */
+ int orgId;
+
+ /** Begin of key. */
+ int keyBegin;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ public NotReservedCacheOpAffinityRun() {
+ // No-op.
+ }
+
+ /**
+ * @param orgId Organization.
+ * @param keyBegin Begin key value.
+ * @param cacheName Cache name.
+ */
+ public NotReservedCacheOpAffinityRun(int orgId, int keyBegin, String cacheName) {
+ this.orgId = orgId;
+ this.keyBegin = keyBegin;
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ log.info("Begin run " + keyBegin);
+ IgniteCache cache = ignite.cache(cacheName);
+ Map<Integer, Integer> vals = new HashMap<>();
+
+ for (int i = 0; i < KEYS_CNT; ++i)
+ cache.put(i + keyBegin, i + keyBegin);
+// vals.put(i + keyBegin, i + keyBegin);
+
+// cache.putAll(vals);
+ log.info("End run " + keyBegin);
+ }
+ }
+
+ /** */
+ private static class ReservedPartitionCacheOpAffinityRun implements IgniteRunnable {
+ /** Org id. */
+ int orgId;
+
+ /** Begin of key. */
+ int keyBegin;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ public ReservedPartitionCacheOpAffinityRun() {
+ // No-op.
+ }
+
+ /**
+ * @param orgId Organization Id.
+ * @param keyBegin Begin key value;
+ */
+ public ReservedPartitionCacheOpAffinityRun(int orgId, int keyBegin) {
+ this.orgId = orgId;
+ this.keyBegin = keyBegin;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ log.info("Begin run " + keyBegin);
+ IgniteCache cache = ignite.cache(Person.class.getSimpleName());
+ Map<Person.Key, Person> pers = new HashMap<>();
+
+ for (int i = 0; i < KEYS_CNT; ++i) {
+ Person p = new Person(i + keyBegin, orgId);
+// pers.put(p.createKey(), p);
+ cache.put(p.createKey(), p);
+ }
+
+// cache.putAll(pers);
+ }
+ }
+}
\ No newline at end of file