You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/08/09 12:36:41 UTC
[3/6] ignite git commit: IGNITE-2310 Lock cache partition for
affinityRun/affinityCall execution
IGNITE-2310 Lock cache partition for affinityRun/affinityCall execution
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01800101
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01800101
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01800101
Branch: refs/heads/ignite-3661
Commit: 018001011daff723d120834da7b4f57bad7f8f71
Parents: 00f47d7
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri May 27 15:16:27 2016 +0300
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Tue Aug 9 12:50:23 2016 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 6 +-
.../java/org/apache/ignite/IgniteCompute.java | 69 +-
.../ignite/internal/GridJobExecuteRequest.java | 148 +++-
.../ignite/internal/GridJobExecuteResponse.java | 42 +-
.../ignite/internal/IgniteComputeImpl.java | 120 ++-
.../failover/GridFailoverContextImpl.java | 27 +-
.../managers/failover/GridFailoverManager.java | 17 +-
.../affinity/GridAffinityProcessor.java | 18 +
.../processors/cache/GridCacheSwapManager.java | 22 +-
.../distributed/dht/GridDhtCacheAdapter.java | 19 +-
.../distributed/dht/GridDhtLocalPartition.java | 33 +-
.../cache/distributed/dht/GridReservable.java | 5 +-
.../cache/query/GridCacheQueryManager.java | 10 +-
.../processors/closure/AffinityTask.java | 17 +-
.../closure/GridClosureProcessor.java | 142 +++-
.../processors/job/GridJobProcessor.java | 139 ++-
.../internal/processors/job/GridJobWorker.java | 203 +++--
.../processors/query/GridQueryProcessor.java | 22 +-
.../processors/task/GridTaskWorker.java | 235 +++--
.../ignite/internal/util/IgniteUtils.java | 10 +
.../ignite/spi/failover/FailoverContext.java | 15 +-
.../spi/failover/always/AlwaysFailoverSpi.java | 15 +-
.../GridJobMasterLeaveAwareSelfTest.java | 4 +-
...ectionLocalJobMultipleArgumentsSelfTest.java | 4 +-
.../GridTaskFailoverAffinityRunTest.java | 2 +-
.../IgniteComputeEmptyClusterGroupTest.java | 8 +-
.../binary/GridBinaryAffinityKeySelfTest.java | 6 +-
...acheAbstractUsersAffinityMapperSelfTest.java | 2 +-
...niteDynamicCacheStartStopConcurrentTest.java | 2 +-
.../spi/failover/GridFailoverTestContext.java | 6 +
...eLockPartitionOnAffinityRunAbstractTest.java | 412 +++++++++
...PartitionOnAffinityRunAtomicCacheOpTest.java | 329 +++++++
...niteCacheLockPartitionOnAffinityRunTest.java | 852 +++++++++++++++++++
...LockPartitionOnAffinityRunTxCacheOpTest.java | 33 +
...titionOnAffinityRunWithCollisionSpiTest.java | 204 +++++
.../IgniteCacheAffinityRunTestSuite.java | 45 +
36 files changed, 2961 insertions(+), 282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index a6ae0da..5cfd9c5 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -744,14 +744,14 @@ public class MessageCodeGenerator {
else if (type.isArray()) {
Class<?> compType = type.getComponentType();
- returnFalseIfReadFailed(name, "reader.readObjectArray", field, setExpr,
+ returnFalseIfReadFailed(name, "reader.readObjectArray", setExpr, field,
"MessageCollectionItemType." + typeEnum(compType),
compType.getSimpleName() + ".class");
}
else if (Collection.class.isAssignableFrom(type) && !Set.class.isAssignableFrom(type)) {
assert colItemType != null;
- returnFalseIfReadFailed(name, "reader.readCollection", field, setExpr,
+ returnFalseIfReadFailed(name, "reader.readCollection", setExpr, field,
"MessageCollectionItemType." + typeEnum(colItemType));
}
else if (Map.class.isAssignableFrom(type)) {
@@ -760,7 +760,7 @@ public class MessageCodeGenerator {
boolean linked = type.equals(LinkedHashMap.class);
- returnFalseIfReadFailed(name, "reader.readMap", field, setExpr,
+ returnFalseIfReadFailed(name, "reader.readMap", setExpr, field,
"MessageCollectionItemType." + typeEnum(mapKeyType),
"MessageCollectionItemType." + typeEnum(mapValType),
linked ? "true" : "false");
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index f7d4bc5..212849a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -38,6 +38,7 @@ import org.apache.ignite.resources.SpringResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.spi.failover.FailoverSpi;
import org.apache.ignite.spi.loadbalancing.LoadBalancingSpi;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -122,7 +123,8 @@ public interface IgniteCompute extends IgniteAsyncSupport {
/**
* Executes given job on the node where data for provided affinity key is located
- * (a.k.a. affinity co-location).
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed.
*
* @param cacheName Name of the cache to use for affinity co-location.
* @param affKey Affinity key.
@@ -134,7 +136,38 @@ public interface IgniteCompute extends IgniteAsyncSupport {
/**
* Executes given job on the node where data for provided affinity key is located
- * (a.k.a. affinity co-location).
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed. The data
+ * of the extra caches' partitions with the same partition number also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param affKey Affinity key.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ @IgniteAsyncSupported
+ public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job)
+ throws IgniteException;
+
+ /**
+ * Executes given job on the node where partition is located (the partition is primary on the node)
+ * The data of the partition will not be migrated from the target node
+ * while the job is executed. The data of the extra caches' partitions with the same partition number
+ * also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param partId Partition number.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ @IgniteAsyncSupported
+ public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job)
+ throws IgniteException;
+
+ /**
+ * Executes given job on the node where data for provided affinity key is located
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed.
*
* @param cacheName Name of the cache to use for affinity co-location.
* @param affKey Affinity key.
@@ -146,6 +179,38 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <R> R affinityCall(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) throws IgniteException;
/**
+ * Executes given job on the node where data for provided affinity key is located
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed. The data
+ * of the extra caches' partitions with the same partition number also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param affKey Affinity key.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return Job result.
+ * @throws IgniteException If job failed.
+ */
+ @IgniteAsyncSupported
+ public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job)
+ throws IgniteException;
+
+ /**
+ * Executes given job on the node where partition is located (the partition is primary on the node)
+ * The data of the partition will not be migrated from the target node
+ * while the job is executed. The data of the extra caches' partitions with the same partition number
+ * also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param partId Partition to reserve.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return Job result.
+ * @throws IgniteException If job failed.
+ */
+ @IgniteAsyncSupported
+ public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job)
+ throws IgniteException;
+
+ /**
* Executes given task on within the cluster group. For step-by-step explanation of task execution process
* refer to {@link ComputeTask} documentation.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index 28b4094..ed431d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -136,6 +137,15 @@ public class GridJobExecuteRequest implements Message {
@GridDirectCollection(UUID.class)
private Collection<UUID> top;
+ /** */
+ private int[] idsOfCaches;
+
+ /** */
+ private int part;
+
+ /** */
+ private AffinityTopologyVersion topVer;
+
/**
* No-op constructor to support {@link Externalizable} interface.
*/
@@ -169,6 +179,9 @@ public class GridJobExecuteRequest implements Message {
* @param sesFullSup {@code True} if session attributes are disabled.
* @param internal {@code True} if internal job.
* @param subjId Subject ID.
+ * @param cacheIds Caches' identifiers to reserve partition.
+ * @param part Partition to lock.
+ * @param topVer Affinity topology version of job mapping.
*/
public GridJobExecuteRequest(
IgniteUuid sesId,
@@ -195,7 +208,10 @@ public class GridJobExecuteRequest implements Message {
boolean forceLocDep,
boolean sesFullSup,
boolean internal,
- UUID subjId) {
+ UUID subjId,
+ @Nullable int[] cacheIds,
+ int part,
+ @Nullable AffinityTopologyVersion topVer) {
this.top = top;
assert sesId != null;
assert jobId != null;
@@ -232,6 +248,9 @@ public class GridJobExecuteRequest implements Message {
this.sesFullSup = sesFullSup;
this.internal = internal;
this.subjId = subjId;
+ this.idsOfCaches = cacheIds;
+ this.part = part;
+ this.topVer = topVer;
this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi;
}
@@ -421,6 +440,27 @@ public class GridJobExecuteRequest implements Message {
return subjId;
}
+ /**
+ * @return Caches' identifiers to reserve specified partition for job execution.
+ */
+ public int[] getCacheIds() {
+ return idsOfCaches;
+ }
+
+ /**
+ * @return Partitions to lock for job execution.
+ */
+ public int getPartition() {
+ return part;
+ }
+
+ /**
+ * @return Affinity version which was used to map job
+ */
+ public AffinityTopologyVersion getTopVer() {
+ return topVer;
+ }
+
/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
@@ -469,96 +509,114 @@ public class GridJobExecuteRequest implements Message {
writer.incrementState();
case 5:
- if (!writer.writeBoolean("internal", internal))
+ if (!writer.writeIntArray("idsOfCaches", idsOfCaches))
return false;
writer.incrementState();
case 6:
- if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes))
+ if (!writer.writeBoolean("internal", internal))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByteArray("jobBytes", jobBytes))
+ if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes))
return false;
writer.incrementState();
case 8:
- if (!writer.writeIgniteUuid("jobId", jobId))
+ if (!writer.writeByteArray("jobBytes", jobBytes))
return false;
writer.incrementState();
case 9:
- if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+ if (!writer.writeIgniteUuid("jobId", jobId))
return false;
writer.incrementState();
case 10:
- if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes))
+ if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("sesFullSup", sesFullSup))
+ if (!writer.writeInt("part", part))
return false;
writer.incrementState();
case 12:
- if (!writer.writeIgniteUuid("sesId", sesId))
+ if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes))
return false;
writer.incrementState();
case 13:
- if (!writer.writeByteArray("siblingsBytes", siblingsBytes))
+ if (!writer.writeBoolean("sesFullSup", sesFullSup))
return false;
writer.incrementState();
case 14:
- if (!writer.writeLong("startTaskTime", startTaskTime))
+ if (!writer.writeIgniteUuid("sesId", sesId))
return false;
writer.incrementState();
case 15:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeByteArray("siblingsBytes", siblingsBytes))
return false;
writer.incrementState();
case 16:
- if (!writer.writeString("taskClsName", taskClsName))
+ if (!writer.writeLong("startTaskTime", startTaskTime))
return false;
writer.incrementState();
case 17:
- if (!writer.writeString("taskName", taskName))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 18:
- if (!writer.writeLong("timeout", timeout))
+ if (!writer.writeString("taskClsName", taskClsName))
return false;
writer.incrementState();
case 19:
- if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID))
+ if (!writer.writeString("taskName", taskName))
return false;
writer.incrementState();
case 20:
+ if (!writer.writeLong("timeout", timeout))
+ return false;
+
+ writer.incrementState();
+
+ case 21:
+ if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 22:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
if (!writer.writeString("userVer", userVer))
return false;
@@ -622,7 +680,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 5:
- internal = reader.readBoolean("internal");
+ idsOfCaches = reader.readIntArray("idsOfCaches");
if (!reader.isLastRead())
return false;
@@ -630,7 +688,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 6:
- jobAttrsBytes = reader.readByteArray("jobAttrsBytes");
+ internal = reader.readBoolean("internal");
if (!reader.isLastRead())
return false;
@@ -638,7 +696,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 7:
- jobBytes = reader.readByteArray("jobBytes");
+ jobAttrsBytes = reader.readByteArray("jobAttrsBytes");
if (!reader.isLastRead())
return false;
@@ -646,7 +704,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 8:
- jobId = reader.readIgniteUuid("jobId");
+ jobBytes = reader.readByteArray("jobBytes");
if (!reader.isLastRead())
return false;
@@ -654,7 +712,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 9:
- ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+ jobId = reader.readIgniteUuid("jobId");
if (!reader.isLastRead())
return false;
@@ -662,7 +720,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 10:
- sesAttrsBytes = reader.readByteArray("sesAttrsBytes");
+ ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
if (!reader.isLastRead())
return false;
@@ -670,7 +728,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 11:
- sesFullSup = reader.readBoolean("sesFullSup");
+ part = reader.readInt("part");
if (!reader.isLastRead())
return false;
@@ -678,7 +736,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 12:
- sesId = reader.readIgniteUuid("sesId");
+ sesAttrsBytes = reader.readByteArray("sesAttrsBytes");
if (!reader.isLastRead())
return false;
@@ -686,7 +744,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 13:
- siblingsBytes = reader.readByteArray("siblingsBytes");
+ sesFullSup = reader.readBoolean("sesFullSup");
if (!reader.isLastRead())
return false;
@@ -694,7 +752,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 14:
- startTaskTime = reader.readLong("startTaskTime");
+ sesId = reader.readIgniteUuid("sesId");
if (!reader.isLastRead())
return false;
@@ -702,7 +760,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 15:
- subjId = reader.readUuid("subjId");
+ siblingsBytes = reader.readByteArray("siblingsBytes");
if (!reader.isLastRead())
return false;
@@ -710,7 +768,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 16:
- taskClsName = reader.readString("taskClsName");
+ startTaskTime = reader.readLong("startTaskTime");
if (!reader.isLastRead())
return false;
@@ -718,7 +776,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 17:
- taskName = reader.readString("taskName");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -726,7 +784,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 18:
- timeout = reader.readLong("timeout");
+ taskClsName = reader.readString("taskClsName");
if (!reader.isLastRead())
return false;
@@ -734,7 +792,7 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 19:
- top = reader.readCollection("top", MessageCollectionItemType.UUID);
+ taskName = reader.readString("taskName");
if (!reader.isLastRead())
return false;
@@ -742,6 +800,30 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
case 20:
+ timeout = reader.readLong("timeout");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 21:
+ top = reader.readCollection("top", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 22:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 23:
userVer = reader.readString("userVer");
if (!reader.isLastRead())
@@ -761,11 +843,11 @@ public class GridJobExecuteRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 21;
+ return 24;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridJobExecuteRequest.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
index bfbd859..9724bc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -75,6 +76,9 @@ public class GridJobExecuteResponse implements Message {
@GridDirectTransient
private IgniteException fakeEx;
+ /** */
+ private AffinityTopologyVersion retry;
+
/**
* No-op constructor to support {@link Externalizable} interface. This
* constructor is not meant to be used for other purposes.
@@ -94,6 +98,7 @@ public class GridJobExecuteResponse implements Message {
* @param jobAttrsBytes Serialized job attributes.
* @param jobAttrs Job attributes.
* @param isCancelled Whether job was cancelled or not.
+ * @param retry Topology version for that partitions haven't been reserved on the affinity node.
*/
public GridJobExecuteResponse(UUID nodeId,
IgniteUuid sesId,
@@ -104,7 +109,8 @@ public class GridJobExecuteResponse implements Message {
Object res,
byte[] jobAttrsBytes,
Map<Object, Object> jobAttrs,
- boolean isCancelled)
+ boolean isCancelled,
+ AffinityTopologyVersion retry)
{
assert nodeId != null;
assert sesId != null;
@@ -120,6 +126,7 @@ public class GridJobExecuteResponse implements Message {
this.jobAttrsBytes = jobAttrsBytes;
this.jobAttrs = jobAttrs;
this.isCancelled = isCancelled;
+ this.retry = retry;
}
/**
@@ -206,6 +213,21 @@ public class GridJobExecuteResponse implements Message {
this.fakeEx = fakeEx;
}
+ /**
+ * @return {@code True} if need retry job.
+ */
+ public boolean retry() {
+ return retry != null;
+ }
+
+ /**
+ * @return Topology version for that specified partitions haven't been reserved
+ * on the affinity node.
+ */
+ public AffinityTopologyVersion getRetryTopologyVersion() {
+ return retry != null ? retry : AffinityTopologyVersion.NONE;
+ }
+
/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
@@ -260,6 +282,12 @@ public class GridJobExecuteResponse implements Message {
writer.incrementState();
case 6:
+ if (!writer.writeMessage("retry", retry))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeIgniteUuid("sesId", sesId))
return false;
@@ -327,6 +355,14 @@ public class GridJobExecuteResponse implements Message {
reader.incrementState();
case 6:
+ retry = reader.readMessage("retry");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
sesId = reader.readIgniteUuid("sesId");
if (!reader.isLastRead())
@@ -346,11 +382,11 @@ public class GridJobExecuteResponse implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridJobExecuteResponse.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index 15ad15f..26c6797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -35,6 +35,7 @@ import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
@@ -43,6 +44,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
@@ -109,7 +111,64 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- saveOrGet(ctx.closure().affinityRun(cacheName, affKey, job, prj.nodes()));
+ // In case cache key is passed instead of affinity key.
+ final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+ int partId = ctx.affinity().partition(cacheName, affKey0);
+
+ if (partId < 0)
+ throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ + affKey + ']');
+
+ saveOrGet(ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, affKey,
+ job, prj.nodes()));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job) {
+ A.notNull(affKey, "affKey");
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+ guard();
+
+ try {
+ final String cacheName = F.first(cacheNames);
+
+ // In case cache key is passed instead of affinity key.
+ final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+ int partId = ctx.affinity().partition(cacheName, affKey0);
+
+ if (partId < 0)
+ throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ + affKey + ']');
+
+ saveOrGet(ctx.closure().affinityRun(cacheNames, partId, affKey, job, prj.nodes()));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job) {
+ A.ensure(partId >= 0, "partId = " + partId);
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+ guard();
+
+ try {
+ saveOrGet(ctx.closure().affinityRun(cacheNames, partId, null, job, prj.nodes()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -127,7 +186,64 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return saveOrGet(ctx.closure().affinityCall(cacheName, affKey, job, prj.nodes()));
+ // In case cache key is passed instead of affinity key.
+ final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+ int partId = ctx.affinity().partition(cacheName, affKey0);
+
+ if (partId < 0)
+ throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ + affKey + ']');
+
+ return saveOrGet(ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, affKey, job,
+ prj.nodes()));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job) {
+ A.notNull(affKey, "affKey");
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+ guard();
+
+ try {
+ final String cacheName = F.first(cacheNames);
+
+ // In case cache key is passed instead of affinity key.
+ final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+ int partId = ctx.affinity().partition(cacheName, affKey0);
+
+ if (partId < 0)
+ throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ + affKey + ']');
+
+ return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, affKey, job, prj.nodes()));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job) {
+ A.ensure(partId >= 0, "partId = " + partId);
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+ guard();
+
+ try {
+ return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, null, job, prj.nodes()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
index 3985df7..ad77271 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.failover.FailoverContext;
@@ -42,26 +43,36 @@ public class GridFailoverContextImpl implements FailoverContext {
@GridToStringExclude
private final GridLoadBalancerManager loadMgr;
+ /** Partition key for affinityCall. */
+ private final int partId;
+
/** Affinity key for affinityCall. */
private final Object affKey;
/** Affinity cache name for affinityCall. */
private final String affCacheName;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
+
/**
* Initializes failover context.
*
* @param taskSes Grid task session.
* @param jobRes Failed job result.
* @param loadMgr Load manager.
+ * @param partId Partition.
* @param affKey Affinity key.
* @param affCacheName Affinity cache name.
+ * @param topVer Affinity topology version.
*/
public GridFailoverContextImpl(GridTaskSessionImpl taskSes,
ComputeJobResult jobRes,
GridLoadBalancerManager loadMgr,
+ int partId,
@Nullable Object affKey,
- @Nullable String affCacheName) {
+ @Nullable String affCacheName,
+ @Nullable AffinityTopologyVersion topVer) {
assert taskSes != null;
assert jobRes != null;
assert loadMgr != null;
@@ -69,8 +80,10 @@ public class GridFailoverContextImpl implements FailoverContext {
this.taskSes = taskSes;
this.jobRes = jobRes;
this.loadMgr = loadMgr;
+ this.partId = partId;
this.affKey = affKey;
this.affCacheName = affCacheName;
+ this.topVer = topVer;
}
/** {@inheritDoc} */
@@ -99,6 +112,18 @@ public class GridFailoverContextImpl implements FailoverContext {
}
/** {@inheritDoc} */
+ public int partition() {
+ return partId;
+ }
+
+ /**
+ * @return Affinity topology version.
+ */
+ @Nullable public AffinityTopologyVersion affinityTopologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridFailoverContextImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
index fa22b62..52edd1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
@@ -24,6 +24,7 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.managers.GridManagerAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.spi.failover.FailoverSpi;
import org.jetbrains.annotations.Nullable;
@@ -58,16 +59,26 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
* @param taskSes Task session.
* @param jobRes Job result.
* @param top Collection of all topology nodes.
+ * @param affPartId Partition number.
* @param affKey Affinity key.
* @param affCacheName Affinity cache name.
+ * @param topVer Affinity topology version.
* @return New node to route this job to.
*/
public ClusterNode failover(GridTaskSessionImpl taskSes,
ComputeJobResult jobRes,
List<ClusterNode> top,
+ int affPartId,
@Nullable Object affKey,
- @Nullable String affCacheName) {
- return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes,
- ctx.loadBalancing(), affKey, affCacheName), top);
+ @Nullable String affCacheName,
+ @Nullable AffinityTopologyVersion topVer) {
+ return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes,
+ jobRes,
+ ctx.loadBalancing(),
+ affPartId,
+ affKey,
+ affCacheName,
+ topVer),
+ top);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 19e0842..1726d02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -195,6 +195,24 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
}
/**
+ * Maps partition to a node.
+ *
+ * @param cacheName Cache name.
+ * @param partId partition.
+ * @param topVer Affinity topology version.
+ * @return Picked node.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public ClusterNode mapPartitionToNode(@Nullable String cacheName, int partId,
+ AffinityTopologyVersion topVer)
+ throws IgniteCheckedException {
+ AffinityInfo affInfo = affinityCache(cacheName, topVer);
+
+ return affInfo != null ? F.first(affInfo.assignment().get(partId)) : null;
+ }
+
+
+ /**
* Maps keys to nodes for given cache.
*
* @param cacheName Cache name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index cc3261c..fd0b471 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1997,6 +1997,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup)
throws IgniteCheckedException
{
+ return rawSwapIterator(primary, backup, cctx.affinity().affinityTopologyVersion());
+ }
+
+ /**
+ * @return Raw off-heap iterator.
+ * @param primary Include primaries.
+ * @param backup Include backups.
+ * @param topVer Affinity topology version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup,
+ AffinityTopologyVersion topVer)
+ throws IgniteCheckedException
+ {
+ assert topVer != null;
+
if (!swapEnabled || (!primary && !backup))
return new GridEmptyCloseableIterator<>();
@@ -2005,10 +2021,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (primary && backup)
return swapMgr.rawIterator(spaceName);
- AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
-
- Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(parts) {
@Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 14468eb..35e6267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1229,13 +1229,28 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary,
final boolean backup,
final boolean keepBinary) {
+ return localEntriesIterator(primary,
+ backup,
+ keepBinary,
+ ctx.affinity().affinityTopologyVersion());
+ }
+
+ /**
+ * @param primary If {@code true} includes primary entries.
+ * @param backup If {@code true} includes backup entries.
+ * @param keepBinary Keep binary flag.
+ * @param topVer Specified affinity topology version.
+ * @return Local entries iterator.
+ */
+ public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary,
+ final boolean backup,
+ final boolean keepBinary,
+ final AffinityTopologyVersion topVer) {
assert primary || backup;
if (primary && backup)
return iterator(entries().iterator(), !keepBinary);
else {
- final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator();
Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 5061136..39a3e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -117,6 +117,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
/** Update counter. */
private final AtomicLong cntr = new AtomicLong();
+ /** Set if failed to move partition to RENTING state due to reservations, to be checked when
+ * reservation is released. */
+ private volatile boolean shouldBeRenting;
+
/**
* @param cctx Context.
* @param id Partition ID.
@@ -411,6 +415,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
// Decrement reservations.
if (state.compareAndSet(reservations, --reservations)) {
+ if ((reservations & 0xFFFF) == 0 && shouldBeRenting)
+ rent(true);
+
tryEvict();
break;
@@ -461,24 +468,24 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @return Future to signal that this node is no longer an owner or backup.
*/
IgniteInternalFuture<?> rent(boolean updateSeq) {
- while (true) {
- long reservations = state.get();
+ long reservations = state.get();
- int ord = (int)(reservations >> 32);
+ int ord = (int)(reservations >> 32);
- if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
- return rent;
+ if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
+ return rent;
- if (casState(reservations, RENTING)) {
- if (log.isDebugEnabled())
- log.debug("Moved partition to RENTING state: " + this);
+ shouldBeRenting = true;
- // Evict asynchronously, as the 'rent' method may be called
- // from within write locks on local partition.
- tryEvictAsync(updateSeq);
+ if ((reservations & 0xFFFF) == 0 && casState(reservations, RENTING)) {
+ shouldBeRenting = false;
- break;
- }
+ if (log.isDebugEnabled())
+ log.debug("Moved partition to RENTING state: " + this);
+
+ // Evict asynchronously, as the 'rent' method may be called
+ // from within write locks on local partition.
+ tryEvictAsync(updateSeq);
}
return rent;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
index 51f22bc..068c68d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import org.apache.ignite.IgniteCheckedException;
+
/**
* Reservations support.
*/
@@ -25,8 +27,9 @@ public interface GridReservable {
* Reserves.
*
* @return {@code true} If reserved successfully.
+ * @throws IgniteCheckedException If failed.
*/
- public boolean reserve();
+ public boolean reserve() throws IgniteCheckedException;
/**
* Releases.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 6729d41..163bac5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -846,7 +846,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final ExpiryPolicy plc = cctx.expiry();
- final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion topVer = GridQueryProcessor.getRequestAffinityTopologyVersion();
+
+ if (topVer == null)
+ topVer = cctx.affinity().affinityTopologyVersion();
final boolean backups = qry.includeBackups() || cctx.isReplicated();
@@ -935,7 +938,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
Integer part = qry.partition();
- Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
+ Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups, topVer) :
cctx.swap().rawSwapIterator(part);
if (expPlc != null)
@@ -978,8 +981,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht();
final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true,
- backups,
- cache.context().keepBinary());
+ backups, cache.context().keepBinary(), topVer);
return new GridIteratorAdapter<IgniteBiTuple<K, V>>() {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
index da00f01..9007c8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.closure;
+import java.util.Collection;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.jetbrains.annotations.Nullable;
/**
@@ -26,10 +28,21 @@ public interface AffinityTask {
/**
* @return Affinity key.
*/
- public Object affinityKey();
+ @Deprecated
+ @Nullable public Object affinityKey();
+
+ /**
+ * @return Partition.
+ */
+ public int partition();
/**
* @return Affinity cache name.
*/
- @Nullable public String affinityCacheName();
+ @Nullable public Collection<String> affinityCacheNames();
+
+ /**
+ * @return Affinity topology version.
+ */
+ @Nullable public AffinityTopologyVersion topologyVersion();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index f9b74c4..6f878ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.GridInternalWrapper;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -439,34 +440,38 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * @param cacheName Cache name.
+ * @param cacheNames Cache names.
+ * @param partId Partition.
* @param affKey Affinity key.
- * @param job Job.
+ * @param job Closure to execute.
* @param nodes Grid nodes.
- * @return Job future.
+ * @return Grid future for collection of closure results.
+ * @throws IgniteCheckedException If failed.
*/
- public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job,
- @Nullable Collection<ClusterNode> nodes) {
+ public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames,
+ int partId,
+ @Nullable Object affKey,
+ Callable<R> job,
+ @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+ assert partId >= 0 : partId;
+
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
- // In case cache key is passed instead of affinity key.
- final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+ final String cacheName = F.first(cacheNames);
- final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
+ final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+ final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
if (node == null)
return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T5(node, job, affKey0, cacheName), null, false);
- }
- catch (IgniteCheckedException e) {
- return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
+ return ctx.task().execute(new T5(node, job, cacheNames, partId, affKey, mapTopVer), null, false);
}
finally {
busyLock.readUnlock();
@@ -474,34 +479,38 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * @param cacheName Cache name.
+ * @param cacheNames Cache names.
+ * @param partId Partition.
* @param affKey Affinity key.
* @param job Job.
* @param nodes Grid nodes.
* @return Job future.
+ * @throws IgniteCheckedException If failed.
*/
- public ComputeTaskInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job,
- @Nullable Collection<ClusterNode> nodes) {
+ public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames,
+ int partId,
+ @Nullable Object affKey,
+ Runnable job,
+ @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+ assert partId >= 0 : partId;
+
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
- // In case cache key is passed instead of affinity key.
- final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+ final String cacheName = F.first(cacheNames);
- final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
+ final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+ final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
if (node == null)
return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T4(node, job, affKey0, cacheName), null, false);
- }
- catch (IgniteCheckedException e) {
- return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
+ return ctx.task().execute(new T4(node, job, cacheNames, partId, affKey, mapTopVer), null, false);
}
finally {
busyLock.readUnlock();
@@ -1183,7 +1192,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- *
+ * @return Map.
*/
public Map<ComputeJob, ClusterNode> map() {
return map;
@@ -1346,23 +1355,35 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private Object affKey;
/** */
- private String affCacheName;
+ private int partId;
+
+ /** */
+ private AffinityTopologyVersion topVer;
+
+ /** */
+ private Collection<String> affCacheNames;
+
/**
* @param node Cluster node.
- * @param job Job.
+ * @param job Job affinity partition.
+ * @param affCacheNames Affinity caches.
+ * @param partId Partition.
* @param affKey Affinity key.
- * @param affCacheName Affinity cache name.
+ * @param topVer Affinity topology version.
*/
- private T4(ClusterNode node, Runnable job, Object affKey, String affCacheName) {
+ private T4(ClusterNode node, Runnable job, Collection<String> affCacheNames, int partId, Object affKey,
+ AffinityTopologyVersion topVer) {
super(U.peerDeployAware0(job));
- assert affKey != null;
+ assert partId >= 0;
this.node = node;
this.job = job;
+ this.affCacheNames = affCacheNames;
+ this.partId = partId;
this.affKey = affKey;
- this.affCacheName = affCacheName;
+ this.topVer = topVer;
}
/** {@inheritDoc} */
@@ -1371,13 +1392,23 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public Object affinityKey() {
- return affKey;
+ @Override public int partition() {
+ return partId;
}
/** {@inheritDoc} */
- @Nullable @Override public String affinityCacheName() {
- return affCacheName;
+ @Nullable @Override public Collection<String> affinityCacheNames() {
+ return affCacheNames;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object affinityKey() {
+ return affKey;
}
}
@@ -1398,23 +1429,38 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private Object affKey;
/** */
- private String affCacheName;
+ private int partId;
+
+ /** */
+ private AffinityTopologyVersion topVer;
+
+ /** */
+ private Collection<String> affCacheNames;
+
+
/**
* @param node Cluster node.
- * @param job Job.
+ * @param job Job affinity partition.
+ * @param affCacheNames Affinity caches.
+ * @param partId Partition.
* @param affKey Affinity key.
- * @param affCacheName Affinity cache name.
+ * @param topVer Affinity topology version.
*/
- private T5(ClusterNode node, Callable<R> job, Object affKey, String affCacheName) {
+ private T5(ClusterNode node,
+ Callable<R> job,
+ Collection<String> affCacheNames,
+ int partId,
+ Object affKey,
+ AffinityTopologyVersion topVer) {
super(U.peerDeployAware0(job));
- assert affKey != null;
-
this.node = node;
this.job = job;
+ this.affCacheNames = affCacheNames;
+ this.partId = partId;
this.affKey = affKey;
- this.affCacheName = affCacheName;
+ this.topVer = topVer;
}
/** {@inheritDoc} */
@@ -1433,13 +1479,23 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public Object affinityKey() {
+ @Nullable @Override public Object affinityKey() {
return affKey;
}
/** {@inheritDoc} */
- @Nullable @Override public String affinityCacheName() {
- return affCacheName;
+ @Override public int partition() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Collection<String> affinityCacheNames() {
+ return affCacheNames;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index a2e9e33..6a162d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -60,12 +60,17 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -93,6 +98,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
@@ -420,7 +426,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
* @return Siblings.
* @throws IgniteCheckedException If failed.
*/
- public Collection<ComputeJobSibling> requestJobSiblings(final ComputeTaskSession ses) throws IgniteCheckedException {
+ public Collection<ComputeJobSibling> requestJobSiblings(
+ final ComputeTaskSession ses) throws IgniteCheckedException {
assert ses != null;
final UUID taskNodeId = ses.getTaskNodeId();
@@ -628,7 +635,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
GridJobWorker activeJob = activeJobs.get(jobId);
if (activeJob != null && idsMatch.apply(activeJob))
- cancelActiveJob(activeJob, sys);
+ cancelActiveJob(activeJob, sys);
}
}
finally {
@@ -743,7 +750,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
void advance() {
assert w == null;
- while(iter.hasNext()) {
+ while (iter.hasNext()) {
GridJobWorker w0 = iter.next();
assert !w0.isInternal();
@@ -804,7 +811,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
void advance() {
assert w == null;
- while(iter.hasNext()) {
+ while (iter.hasNext()) {
GridJobWorker w0 = iter.next();
assert !w0.isInternal();
@@ -947,6 +954,15 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Received job request message [req=" + req + ", nodeId=" + node.id() + ']');
+ PartitionsReservation partsReservation = null;
+
+ if (req.getCacheIds() != null) {
+ assert req.getPartition() >= 0 : req;
+ assert !F.isEmpty(req.getCacheIds()) : req;
+
+ partsReservation = new PartitionsReservation(req.getCacheIds(), req.getPartition(), req.getTopVer());
+ }
+
GridJobWorker job = null;
if (!rwLock.tryReadLock()) {
@@ -1079,7 +1095,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
node,
req.isInternal(),
evtLsnr,
- holdLsnr);
+ holdLsnr,
+ partsReservation,
+ req.getTopVer());
jobCtx.job(job);
@@ -1330,7 +1348,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
null,
loc ? null : marsh.marshal(null),
null,
- false);
+ false,
+ null);
if (req.isSessionFullSupport()) {
// Send response to designated job topic.
@@ -1472,6 +1491,114 @@ public class GridJobProcessor extends GridProcessorAdapter {
/**
*
*/
+ private class PartitionsReservation implements GridReservable {
+ /** Caches. */
+ private final int[] cacheIds;
+
+ /** Partition id. */
+ private final int partId;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Partitions. */
+ private GridDhtLocalPartition[] partititons;
+
+ /**
+ * @param cacheIds Cache identifiers array.
+ * @param partId Partition number.
+ * @param topVer Affinity topology version.
+ */
+ public PartitionsReservation(int[] cacheIds, int partId,
+ AffinityTopologyVersion topVer) {
+ this.cacheIds = cacheIds;
+ this.partId = partId;
+ this.topVer = topVer;
+ partititons = new GridDhtLocalPartition[cacheIds.length];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean reserve() throws IgniteCheckedException {
+ boolean reserved = false;
+
+ try {
+ for (int i = 0; i < cacheIds.length; ++i) {
+ GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds[i]);
+
+ if (cctx == null) // Cache was not found, probably was not deployed yet.
+ return reserved;
+
+ if (!cctx.started()) // Cache not started.
+ return reserved;
+
+ if (cctx.isLocal() || !cctx.rebalanceEnabled())
+ continue;
+
+ boolean checkPartMapping = false;
+
+ try {
+ if (cctx.isReplicated()) {
+ GridDhtLocalPartition part = cctx.topology().localPartition(partId,
+ topVer, false);
+
+ // We don't need to reserve partitions because they will not be evicted in replicated caches.
+ if (part == null || part.state() != OWNING) {
+ checkPartMapping = true;
+
+ return reserved;
+ }
+ }
+
+ GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+
+ if (part == null || part.state() != OWNING || !part.reserve()) {
+ checkPartMapping = true;
+
+ return reserved;
+ }
+
+ partititons[i] = part;
+
+ // Double check that we are still in owning state and partition contents are not cleared.
+ if (part.state() != OWNING) {
+ checkPartMapping = true;
+
+ return reserved;
+ }
+ }
+ finally {
+ if (checkPartMapping && !cctx.affinity().primary(partId, topVer).id().equals(ctx.localNodeId()))
+ throw new IgniteCheckedException("Failed partition reservation. " +
+ "Partition is not primary on the node. [partition=" + partId + ", cacheName=" + cctx.name() +
+ ", nodeId=" + ctx.localNodeId() + ", topology=" + topVer + ']');
+ }
+ }
+
+ reserved = true;
+ }
+ finally {
+ if (!reserved)
+ release();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ for (int i = 0; i < partititons.length; ++i) {
+ if (partititons[i] == null)
+ break;
+
+ partititons[i].release();
+ partititons[i] = null;
+ }
+ }
+ }
+
+ /**
+ *
+ */
private class CollisionJobContext extends GridCollisionJobContextAdapter {
/** */
private final boolean passive;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 5b04d6f..16fadaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -43,6 +43,9 @@ import org.apache.ignite.internal.GridJobSessionImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.typedef.F;
@@ -154,6 +157,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
/** Hold/unhold listener to notify job processor. */
private final GridJobHoldListener holdLsnr;
+ /** Partitions to reservations. */
+ private final GridReservable partsReservation;
+
+ /** Request topology version. */
+ private final AffinityTopologyVersion reqTopVer;
+
/**
* @param ctx Kernal context.
* @param dep Grid deployment.
@@ -166,6 +175,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @param internal Whether or not task was marked with {@link GridInternal}
* @param evtLsnr Job event listener.
* @param holdLsnr Hold listener.
+ * @param partsReservation Reserved partitions (must be released at the job finish).
+ * @param reqTopVer Affinity topology version of the job request.
*/
GridJobWorker(
GridKernalContext ctx,
@@ -178,7 +189,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
ClusterNode taskNode,
boolean internal,
GridJobEventListener evtLsnr,
- GridJobHoldListener holdLsnr) {
+ GridJobHoldListener holdLsnr,
+ GridReservable partsReservation,
+ AffinityTopologyVersion reqTopVer) {
super(ctx.gridName(), "grid-job-worker", ctx.log(GridJobWorker.class));
assert ctx != null;
@@ -199,6 +212,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
this.taskNode = taskNode;
this.internal = internal;
this.holdLsnr = holdLsnr;
+ this.partsReservation = partsReservation;
+ this.reqTopVer = reqTopVer;
if (job != null)
this.job = job;
@@ -471,96 +486,128 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
// Make sure flag is not set for current thread.
HOLD.set(false);
- if (isCancelled())
- // If job was cancelled prior to assigning runner to it?
- super.cancel();
+ try {
+ if (partsReservation != null) {
+ try {
+ if (!partsReservation.reserve()) {
+ finishJob(null, null, true, true);
- if (!skipNtf) {
- if (holdLsnr.onUnheld(this))
- held.decrementAndGet();
- else {
- if (log.isDebugEnabled())
- log.debug("Ignoring job execution (job was not held).");
+ return;
+ }
+ }
+ catch (Exception e) {
+ IgniteException ex = new IgniteException("Failed to lock partitions " +
+ "[jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
- return;
+ U.error(log, "Failed to lock partitions [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);;
+
+ finishJob(null, ex, true);
+
+ return;
+ }
}
- }
- boolean sndRes = true;
+ if (isCancelled())
+ // If job was cancelled prior to assigning runner to it?
+ super.cancel();
- Object res = null;
+ if (!skipNtf) {
+ if (holdLsnr.onUnheld(this))
+ held.decrementAndGet();
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring job execution (job was not held).");
- IgniteException ex = null;
+ return;
+ }
+ }
- try {
- ctx.job().currentTaskSession(ses);
-
- // If job has timed out, then
- // avoid computation altogether.
- if (isTimedOut())
- sndRes = false;
- else {
- res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {
- @Nullable @Override public Object call() {
- try {
- if (internal && ctx.config().isPeerClassLoadingEnabled())
- ctx.job().internal(true);
+ boolean sndRes = true;
- return job.execute();
- }
- finally {
- if (internal && ctx.config().isPeerClassLoadingEnabled())
- ctx.job().internal(false);
+ Object res = null;
+
+ IgniteException ex = null;
+
+ try {
+ ctx.job().currentTaskSession(ses);
+
+ if (reqTopVer != null)
+ GridQueryProcessor.setRequestAffinityTopologyVersion(reqTopVer);
+
+ // If job has timed out, then
+ // avoid computation altogether.
+ if (isTimedOut())
+ sndRes = false;
+ else {
+ res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {
+ @Nullable @Override public Object call() {
+ try {
+ if (internal && ctx.config().isPeerClassLoadingEnabled())
+ ctx.job().internal(true);
+
+ return job.execute();
+ }
+ finally {
+ if (internal && ctx.config().isPeerClassLoadingEnabled())
+ ctx.job().internal(false);
+ }
}
- }
- });
+ });
- if (log.isDebugEnabled())
- log.debug("Job execution has successfully finished [job=" + job + ", res=" + res + ']');
+ if (log.isDebugEnabled())
+ log.debug("Job execution has successfully finished [job=" + job + ", res=" + res + ']');
+ }
}
- }
- catch (IgniteException e) {
- if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
- ex = handleThrowable(e);
+ catch (IgniteException e) {
+ if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
+ ex = handleThrowable(e);
- assert ex != null;
- }
- else {
- if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) {
- // Print exception for internal errors only if debug is enabled.
- if (log.isDebugEnabled())
- U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+ assert ex != null;
}
- else if (X.hasCause(e, InterruptedException.class)) {
- String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']';
+ else {
+ if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) {
+ // Print exception for internal errors only if debug is enabled.
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+ }
+ else if (X.hasCause(e, InterruptedException.class)) {
+ String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']';
- if (log.isDebugEnabled())
- U.error(log, msg, e);
+ if (log.isDebugEnabled())
+ U.error(log, msg, e);
+ else
+ U.warn(log, msg);
+ }
else
- U.warn(log, msg);
+ U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+
+ ex = e;
}
- else
- U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+ }
+ // Catch Throwable to protect against bad user code except
+ // InterruptedException if job is being cancelled.
+ catch (Throwable e) {
+ ex = handleThrowable(e);
- ex = e;
+ assert ex != null;
+
+ if (e instanceof Error)
+ throw (Error)e;
}
- }
- // Catch Throwable to protect against bad user code except
- // InterruptedException if job is being cancelled.
- catch (Throwable e) {
- ex = handleThrowable(e);
+ finally {
+ // Finish here only if not held by this thread.
+ if (!HOLD.get())
+ finishJob(res, ex, sndRes);
- assert ex != null;
+ ctx.job().currentTaskSession(null);
- if (e instanceof Error)
- throw (Error)e;
+ if (reqTopVer != null)
+ GridQueryProcessor.setRequestAffinityTopologyVersion(null);
+ }
}
finally {
- // Finish here only if not held by this thread.
- if (!HOLD.get())
- finishJob(res, ex, sndRes);
-
- ctx.job().currentTaskSession(null);
+ if (partsReservation != null)
+ partsReservation.release();
}
}
@@ -686,7 +733,20 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
*/
void finishJob(@Nullable Object res,
@Nullable IgniteException ex,
- boolean sndReply)
+ boolean sndReply) {
+ finishJob(res, ex, sndReply, false);
+ }
+
+ /**
+ * @param res Resuilt.
+ * @param ex Exception
+ * @param sndReply If {@code true}, reply will be sent.
+ * @param retry If {@code true}, retry response will be sent.
+ */
+ void finishJob(@Nullable Object res,
+ @Nullable IgniteException ex,
+ boolean sndReply,
+ boolean retry)
{
// Avoid finishing a job more than once from different threads.
if (!finishing.compareAndSet(false, true))
@@ -750,7 +810,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
loc ? res : null,
loc ? null : marsh.marshal(attrs),
loc ? attrs : null,
- isCancelled());
+ isCancelled(),
+ retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);
long timeout = ses.getEndTime() - U.currentTimeMillis();