You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:44 UTC
[23/28] incubator-ignite git commit: ignite-545: merge from sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 26eef50..703daf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -81,7 +81,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
@Override public void start() throws IgniteCheckedException {
super.start();
- preldr = new GridDhtPreloader<>(ctx);
+ preldr = new GridDhtPreloader(ctx);
preldr.start();
@@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
return;
}
- // Group lock can be only started from local node, so we never start group lock transaction on remote node.
IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
// Register listener just so we print out errors.
@@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
assert nodeId != null;
assert res != null;
- GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
- res.futureId());
+ GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
assert tx != null;
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+ GridDhtLockFuture fut = new GridDhtLockFuture(
ctx,
tx.nearNodeId(),
tx.nearXidVersion(),
@@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @return Future.
*/
public IgniteInternalFuture<GridNearLockResponse> lockAllAsync(
- final GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<?, ?> cacheCtx,
final ClusterNode nearNode,
final GridNearLockRequest req,
@Nullable final CacheEntryPredicate[] filter0) {
@@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (filter == null)
filter = req.filter();
- GridDhtLockFuture<K, V> fut = null;
+ GridDhtLockFuture fut = null;
if (!req.inTx()) {
- fut = new GridDhtLockFuture<>(ctx,
- nearNode.id(),
- req.version(),
- req.topologyVersion(),
- cnt,
- req.txRead(),
- req.needReturnValue(),
- req.timeout(),
- tx,
- req.threadId(),
- req.accessTtl(),
- filter,
- req.skipStore());
+ GridDhtPartitionTopology top = null;
+
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
+
+ top = topology();
+
+ topology().readLock();
+ }
+
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
+
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
+
+ return new GridFinishedFuture<>(res);
+ }
+
+ fut = new GridDhtLockFuture(ctx,
+ nearNode.id(),
+ req.version(),
+ req.topologyVersion(),
+ cnt,
+ req.txRead(),
+ req.needReturnValue(),
+ req.timeout(),
+ tx,
+ req.threadId(),
+ req.accessTtl(),
+ filter,
+ req.skipStore());
- // Add before mapping.
- if (!ctx.mvcc().addFuture(fut))
- throw new IllegalStateException("Duplicate future ID: " + fut);
+ // Add before mapping.
+ if (!ctx.mvcc().addFuture(fut))
+ throw new IllegalStateException("Duplicate future ID: " + fut);
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
boolean timedout = false;
@@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
// Handle implicit locks for pessimistic transactions.
if (req.inTx()) {
if (tx == null) {
- tx = new GridDhtTxLocal(
- ctx.shared(),
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- req.implicitTx(),
- req.implicitSingleTx(),
- ctx.systemTx(),
- false,
- ctx.ioPolicy(),
- PESSIMISTIC,
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- false,
- req.txSize(),
- null,
- req.subjectId(),
- req.taskNameHash());
+ GridDhtPartitionTopology top = null;
- tx.syncCommit(req.syncCommit());
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
- tx = ctx.tm().onCreated(null, tx);
+ top = topology();
- if (tx == null || !tx.init()) {
- String msg = "Failed to acquire lock (transaction has been completed): " +
- req.version();
+ topology().readLock();
+ }
- U.warn(log, msg);
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
- if (tx != null)
- tx.rollback();
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
- return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
- }
+ return new GridFinishedFuture<>(res);
+ }
- tx.topologyVersion(req.topologyVersion());
+ tx = new GridDhtTxLocal(
+ ctx.shared(),
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ req.implicitTx(),
+ req.implicitSingleTx(),
+ ctx.systemTx(),
+ false,
+ ctx.ioPolicy(),
+ PESSIMISTIC,
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ false,
+ req.txSize(),
+ null,
+ req.subjectId(),
+ req.taskNameHash());
+
+ tx.syncCommit(req.syncCommit());
+
+ tx = ctx.tm().onCreated(null, tx);
+
+ if (tx == null || !tx.init()) {
+ String msg = "Failed to acquire lock (transaction has been completed): " +
+ req.version();
+
+ U.warn(log, msg);
+
+ if (tx != null)
+ tx.rollback();
+
+ return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+ }
+
+ tx.topologyVersion(req.topologyVersion());
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
ctx.tm().txContext(tx);
@@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
/**
+ * @param nearNode Client node.
+ * @param req Request.
+ * @param topVer Remap version.
+ * @return Response.
+ */
+ private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode,
+ GridNearLockRequest req,
+ AffinityTopologyVersion topVer) {
+ assert topVer != null;
+
+ GridNearLockResponse res = new GridNearLockResponse(
+ ctx.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ 0,
+ null,
+ topVer);
+
+ try {
+ ctx.io().send(nearNode, res, ctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send client lock remap response, client node failed " +
+ "[node=" + nearNode + ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e);
+ }
+
+ return res;
+ }
+
+ /**
* @param nearNode Near node.
* @param entries Entries.
* @param req Lock request.
@@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
// Send reply back to originating near node.
GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
- req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ tx != null && tx.onePhaseCommit(),
+ entries.size(),
+ err,
+ null);
if (err == null) {
res.pending(localDhtPendingVersions(entries, mappedVer));
@@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
U.error(log, "Failed to get value for lock reply message for node [node=" +
U.toShortString(nearNode) + ", req=" + req + ']', e);
- return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
- entries.size(), e);
+ return new GridNearLockResponse(ctx.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ entries.size(),
+ e,
+ null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..90edb0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** Near mappings. */
- protected Map<UUID, GridDistributedTxMapping> nearMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
/** DHT mappings. */
- protected Map<UUID, GridDistributedTxMapping> dhtMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
/** Mapped flag. */
- private AtomicBoolean mapped = new AtomicBoolean();
+ protected AtomicBoolean mapped = new AtomicBoolean();
/** */
private long dhtThreadId;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 293cf95..af0fbdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.writeVersion(),
tx.invalidPartitions(),
ret,
- prepErr);
+ prepErr,
+ null);
if (prepErr == null) {
addDhtValues(res);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8bbfe96..8630421 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -171,7 +171,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
metrics = m;
- preldr = new GridDhtPreloader<>(ctx);
+ preldr = new GridDhtPreloader(ctx);
preldr.start();
@@ -737,6 +737,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable final CacheEntryPredicate[] filter,
final boolean waitTopFut
) {
+ assert ctx.updatesAllowed();
+
if (map != null && keyCheck)
validateCacheKeys(map.keySet());
@@ -793,6 +795,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean rawRetval,
@Nullable final CacheEntryPredicate[] filter
) {
+ assert ctx.updatesAllowed();
+
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -1024,9 +1028,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteCacheExpiryPolicy expiry = null;
try {
- // If batch store update is enabled, we need to lock all entries.
- // First, need to acquire locks on cache entries, then check filter.
- List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+ List<GridDhtCacheEntry> locked = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
try {
@@ -1043,11 +1045,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
// Do not check topology version for CLOCK versioning since
- // partition exchange will wait for near update future.
+ // partition exchange will wait for near update future (if future is on server node).
// Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() ||
- ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+ !needRemap(req.topologyVersion(), topology().topologyVersion())) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
@@ -1056,13 +1058,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
+ // If batch store update is enabled, we need to lock all entries.
+ // First, need to acquire locks on cache entries, then check filter.
+ locked = lockEntries(keys, req.topologyVersion());
+
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
GridCacheVersion ver = req.updateVersion();
if (ver == null) {
// Assign next version for update inside entries lock.
- ver = ctx.versions().next(req.topologyVersion());
+ ver = ctx.versions().next(topology().topologyVersion());
if (hasNear)
res.nearVersion(ver);
@@ -1105,7 +1111,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
retVal = updRes.invokeResults();
}
else {
- UpdateSingleResult<K, V> updRes = updateSingle(node,
+ UpdateSingleResult updRes = updateSingle(node,
hasNear,
req,
res,
@@ -1144,7 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
e.printStackTrace();
}
finally {
- unlockEntries(locked, req.topologyVersion());
+ if (locked != null)
+ unlockEntries(locked, req.topologyVersion());
// Enqueue if necessary after locks release.
if (deleted != null) {
@@ -1157,7 +1164,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (GridDhtInvalidPartitionException ignore) {
- assert ctx.config().getAtomicWriteOrderMode() == PRIMARY;
+ assert !req.fastMap() || req.clientRequest() : req;
if (log.isDebugEnabled())
log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
@@ -1605,7 +1612,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
- private UpdateSingleResult<K, V> updateSingle(
+ private UpdateSingleResult updateSingle(
ClusterNode node,
boolean hasNear,
GridNearAtomicUpdateRequest req,
@@ -1799,7 +1806,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
- return new UpdateSingleResult<>(retVal, deleted, dhtFut);
+ return new UpdateSingleResult(retVal, deleted, dhtFut);
}
/**
@@ -2572,7 +2579,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* Result of {@link GridDhtAtomicCache#updateSingle} execution.
*/
- private static class UpdateSingleResult<K, V> {
+ private static class UpdateSingleResult {
/** */
private final GridCacheReturn retVal;
@@ -2772,14 +2779,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void onTimeout() {
if (guard.compareAndSet(false, true)) {
- writeLock().lock();
+ ctx.closures().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ writeLock().lock();
- try {
- finish();
- }
- finally {
- writeLock().unlock();
- }
+ try {
+ finish();
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 40ab104..ff8454e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Future keys. */
private Collection<KeyCacheObject> keys;
+ /** */
+ private boolean waitForExchange;
+
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
keys = new ArrayList<>(updateReq.keys().size());
+
+ boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
+
+ waitForExchange = !topLocked;
}
/** {@inheritDoc} */
@@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** {@inheritDoc} */
@Override public boolean waitForPartitionExchange() {
- // Wait dht update futures in PRIMARY mode.
- return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+ return waitForExchange;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 76e05e5..07f5ecf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -128,6 +128,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Fast map flag. */
private final boolean fastMap;
+ /** */
+ private boolean fastMapRemap;
+
+ /** */
+ private GridCacheVersion updVer;
+
/** Near cache flag. */
private final boolean nearEnabled;
@@ -304,11 +310,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
if (topVer == null)
- mapOnTopology(keys, false, null, waitTopFut);
+ mapOnTopology(null, false, null, waitTopFut);
else {
topLocked = true;
- map0(topVer, keys, false, null);
+ map0(topVer, null, false, null);
}
}
@@ -343,9 +349,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
*/
public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (res.remapKeys() != null) {
- assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
- mapOnTopology(res.remapKeys(), true, nodeId, true);
+ mapOnTopology(remapKeys, true, nodeId, true);
return;
}
@@ -454,9 +462,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else {
if (waitTopFut) {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override
- public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+ }
+ });
}
});
}
@@ -476,29 +487,43 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* Checks if future is ready to be completed.
*/
- private synchronized void checkComplete() {
- if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
- CachePartialUpdateCheckedException err0 = err;
+ private void checkComplete() {
+ boolean remap = false;
- if (err0 != null)
- onDone(err0);
- else
- onDone(opRes);
+ synchronized (this) {
+ if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+ CachePartialUpdateCheckedException err0 = err;
+
+ if (err0 != null)
+ onDone(err0);
+ else {
+ if (fastMapRemap) {
+ assert cctx.kernalContext().clientNode();
+
+ remap = true;
+ }
+ else
+ onDone(opRes);
+ }
+ }
}
+
+ if (remap)
+ mapOnTopology(null, true, null, true);
}
/**
* @param topVer Topology version.
- * @param keys Keys to map.
+ * @param remapKeys Keys to remap or {@code null} to map all keys.
* @param remap Flag indicating if this is partial remap for this future.
* @param oldNodeId Old node ID if was remap.
*/
private void map0(
AffinityTopologyVersion topVer,
- Collection<?> keys,
+ @Nullable Collection<?> remapKeys,
boolean remap,
@Nullable UUID oldNodeId) {
- assert oldNodeId == null || remap;
+ assert oldNodeId == null || remap || fastMapRemap;
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
@@ -519,12 +544,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
CacheConfiguration ccfg = cctx.config();
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
- GridCacheVersion updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+ if (updVer == null)
+ updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
if (updVer != null && log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
if (keys.size() == 1 && !fastMap && (single == null || single)) {
+ assert remapKeys == null || remapKeys.size() == 1 : remapKeys;
+
Object key = F.first(keys);
Object val;
@@ -610,7 +638,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
filter,
subjId,
taskNameHash,
- skipStore);
+ skipStore,
+ cctx.kernalContext().clientNode());
req.addUpdateEntry(cacheKey,
val,
@@ -647,9 +676,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
- if (remap)
+ if (oldNodeId != null)
removeMapping(oldNodeId);
+ // For fastMap mode wait for all responses before remapping.
+ if (remap && fastMap && !mappings.isEmpty()) {
+ fastMapRemap = true;
+
+ return;
+ }
+
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null) {
@@ -705,6 +741,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
+
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
@@ -748,7 +787,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
filter,
subjId,
taskNameHash,
- skipStore);
+ skipStore,
+ cctx.kernalContext().clientNode());
pendingMappings.put(nodeId, mapped);
@@ -763,6 +803,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
i++;
}
}
+
+ fastMapRemap = false;
}
if ((single == null || single) && pendingMappings.size() == 1) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index a96a666..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** Skip write-through to a persistent storage. */
private boolean skipStore;
+ /** */
+ private boolean clientReq;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
+ * @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
* @param op Cache update operation.
* @param retval Return value required flag.
@@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
+ * @param clientReq Client node request flag.
*/
public GridNearAtomicUpdateRequest(
int cacheId,
@@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
- boolean skipStore
+ boolean skipStore,
+ boolean clientReq
) {
this.cacheId = cacheId;
this.nodeId = nodeId;
@@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
+ this.clientReq = clientReq;
keys = new ArrayList<>();
}
@@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
/**
+ * @return {@code True} if request sent from client node.
+ */
+ public boolean clientRequest() {
+ return clientReq;
+ }
+
+ /**
* @return Cache write synchronization mode.
*/
public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
switch (writer.state()) {
case 3:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ if (!writer.writeBoolean("clientReq", clientReq))
return false;
writer.incrementState();
case 4:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
case 5:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("conflictTtls", conflictTtls))
return false;
writer.incrementState();
case 6:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 8:
- if (!writer.writeBoolean("fastMap", fastMap))
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
return false;
writer.incrementState();
case 9:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+ if (!writer.writeBoolean("fastMap", fastMap))
return false;
writer.incrementState();
case 10:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ if (!writer.writeMessage("futVer", futVer))
return false;
writer.incrementState();
case 12:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeBoolean("hasPrimary", hasPrimary))
return false;
writer.incrementState();
case 13:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 14:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 15:
- if (!writer.writeBoolean("retval", retval))
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
return false;
writer.incrementState();
case 16:
- if (!writer.writeBoolean("skipStore", skipStore))
+ if (!writer.writeBoolean("retval", retval))
return false;
writer.incrementState();
case 17:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("skipStore", skipStore))
return false;
writer.incrementState();
case 18:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 20:
- if (!writer.writeBoolean("topLocked", topLocked))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 21:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeBoolean("topLocked", topLocked))
return false;
writer.incrementState();
case 22:
- if (!writer.writeMessage("updateVer", updateVer))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 23:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
switch (reader.state()) {
case 3:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+ clientReq = reader.readBoolean("clientReq");
if (!reader.isLastRead())
return false;
@@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 4:
- conflictTtls = reader.readMessage("conflictTtls");
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
return false;
@@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 5:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+ conflictTtls = reader.readMessage("conflictTtls");
if (!reader.isLastRead())
return false;
@@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 6:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 7:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
return false;
@@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 8:
- fastMap = reader.readBoolean("fastMap");
+ expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
if (!reader.isLastRead())
return false;
@@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 9:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+ fastMap = reader.readBoolean("fastMap");
if (!reader.isLastRead())
return false;
@@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 10:
- futVer = reader.readMessage("futVer");
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
return false;
@@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 11:
- hasPrimary = reader.readBoolean("hasPrimary");
+ futVer = reader.readMessage("futVer");
if (!reader.isLastRead())
return false;
@@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 12:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ hasPrimary = reader.readBoolean("hasPrimary");
if (!reader.isLastRead())
return false;
@@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 13:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
return false;
@@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 14:
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
byte opOrd;
opOrd = reader.readByte("op");
@@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 15:
+ case 16:
retval = reader.readBoolean("retval");
if (!reader.isLastRead())
@@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 16:
+ case 17:
skipStore = reader.readBoolean("skipStore");
if (!reader.isLastRead())
@@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 17:
+ case 18:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 18:
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 19:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 20:
+ case 21:
topLocked = reader.readBoolean("topLocked");
if (!reader.isLastRead())
@@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 21:
+ case 22:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 22:
+ case 23:
updateVer = reader.readMessage("updateVer");
if (!reader.isLastRead())
@@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 23:
+ case 24:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 24;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..221b230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable TransactionIsolation isolation,
long accessTtl
) {
- assert tx == null || tx instanceof GridNearTxLocal;
+ assert tx == null || tx instanceof GridNearTxLocal : tx;
GridNearTxLocal txx = (GridNearTxLocal)tx;
CacheOperationContext opCtx = ctx.operationContextPerCall();
- GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+ GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx,
keys,
txx,
isRead,
@@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
IgniteInternalFuture<Exception> lockAllAsync(
- final GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
final long threadId,
final GridCacheVersion ver,
@@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
private IgniteInternalFuture<Exception> lockAllAsync0(
- GridCacheContext<K, V> cacheCtx,
+ GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
long threadId,
final GridCacheVersion ver,
@@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
int cnt = keys.size();
if (tx == null) {
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+ GridDhtLockFuture fut = new GridDhtLockFuture(ctx,
ctx.localNodeId(),
ver,
topVer,
@@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
assert nodeId != null;
assert res != null;
- GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc().
+ GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
<Boolean>future(res.version(), res.futureId());
if (fut != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 372c517..c784948 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*;
/**
* Colocated cache lock future.
*/
-public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Lock owner thread. */
@GridToStringInclude
@@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* @param timeout Lock acquisition timeout.
* @param accessTtl TTL for read operation.
* @param filter Filter.
- * @param skipStore
+ * @param skipStore Skip store flag.
*/
public GridDhtColocatedLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
@@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* Undoes all locks.
*
* @param dist If {@code true}, then remove locks from remote nodes as well.
+ * @param rollback {@code True} if should rollback tx.
*/
- private void undoLocks(boolean dist) {
+ private void undoLocks(boolean dist, boolean rollback) {
// Transactions will undo during rollback.
if (dist && tx == null)
cctx.colocated().removeLocks(threadId, lockVer, keys);
else {
- if (tx != null) {
+ if (rollback && tx != null) {
if (tx.setRollbackOnly()) {
if (log.isDebugEnabled())
log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -346,16 +347,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
/**
- *
- * @param dist {@code True} if need to distribute lock release.
- */
- private void onFailed(boolean dist) {
- undoLocks(dist);
-
- complete(false);
- }
-
- /**
* @param success Success flag.
*/
public void complete(boolean success) {
@@ -475,7 +466,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
", fut=" + this + ']');
if (!success)
- undoLocks(distribute);
+ undoLocks(distribute, true);
if (tx != null)
cctx.tm().txContext(tx);
@@ -550,7 +541,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer);
- map(keys);
+ map(keys, false);
markInitialized();
@@ -558,14 +549,17 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
// Must get topology snapshot and map on that version.
- mapOnTopology();
+ mapOnTopology(false, null);
}
/**
* Acquires topology future and checks it completeness under the read lock. If it is not complete,
* will asynchronously wait for it's completeness and then try again.
+ *
+ * @param remap Remap flag.
+ * @param c Optional closure to run after map.
*/
- private void mapOnTopology() {
+ private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {
// We must acquire topology snapshot from the topology version future.
cctx.topology().readLock();
@@ -589,19 +583,30 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
AffinityTopologyVersion topVer = fut.topologyVersion();
- if (tx != null)
- tx.topologyVersion(topVer);
+ if (remap) {
+ if (tx != null)
+ tx.onRemap(topVer);
+
+ this.topVer.set(topVer);
+ }
+ else {
+ if (tx != null)
+ tx.topologyVersion(topVer);
+
+ this.topVer.compareAndSet(null, topVer);
+ }
- this.topVer.compareAndSet(null, topVer);
+ map(keys, remap);
- map(keys);
+ if (c != null)
+ c.run();
markInitialized();
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- mapOnTopology();
+ mapOnTopology(remap, c);
}
});
}
@@ -617,8 +622,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* groups belonging to one primary node and locks for these groups are acquired sequentially.
*
* @param keys Keys.
+ * @param remap Remap flag.
*/
- private void map(Collection<KeyCacheObject> keys) {
+ private void map(Collection<KeyCacheObject> keys, boolean remap) {
try {
AffinityTopologyVersion topVer = this.topVer.get();
@@ -633,8 +639,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
return;
}
+ boolean clientNode = cctx.kernalContext().clientNode();
+
+ assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+
// First assume this node is primary for all keys passed in.
- if (mapAsPrimary(keys, topVer))
+ if (!clientNode && mapAsPrimary(keys, topVer))
return;
Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
@@ -668,6 +678,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
boolean hasRmtNodes = false;
+ boolean first = true;
+
// Create mini futures.
for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
GridNearLockMapping mapping = iter.next();
@@ -736,6 +748,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (cand != null && !cand.reentry()) {
if (req == null) {
+ boolean clientFirst = false;
+
+ if (first) {
+ clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+
+ first = false;
+ }
+
req = new GridNearLockRequest(
cctx.cacheId(),
topVer,
@@ -757,7 +777,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
- skipStore);
+ skipStore,
+ clientFirst);
mapping.request(req);
}
@@ -815,7 +836,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (hasRmtNodes) {
trackable = true;
- if (!cctx.mvcc().addFuture(this))
+ if (!remap && !cctx.mvcc().addFuture(this))
throw new IllegalStateException("Duplicate future ID: " + this);
}
else
@@ -1249,75 +1270,111 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
return;
}
- int i = 0;
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
+
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ remap();
+ }
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- for (KeyCacheObject k : keys) {
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
+ for (KeyCacheObject k : keys) {
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
- CacheObject newVal = res.value(i);
+ CacheObject newVal = res.value(i);
- GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion dhtVer = res.dhtVersion(i);
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
+ }
}
- }
- if (inTx()) {
- IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+ if (inTx()) {
+ IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+
+ // In colocated cache we must receive responses only for detached entries.
+ assert txEntry.cached().detached() : txEntry;
- // In colocated cache we must receive responses only for detached entries.
- assert txEntry.cached().detached();
+ txEntry.markLocked();
- txEntry.markLocked();
+ GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
- GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
+
+ return;
+ }
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ // Set value to detached entry.
+ entry.resetFromPrimary(newVal, dhtVer);
- return;
+ tx.hasRemoteLocks(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ }
+ else
+ cctx.mvcc().markExplicitOwner(k, threadId);
+
+ if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ cctx.events().addEvent(cctx.affinity().partition(k),
+ k,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ null,
+ false,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ tx == null ? null : tx.resolveTaskName());
}
- // Set value to detached entry.
- entry.resetFromPrimary(newVal, dhtVer);
+ i++;
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ try {
+ proceedMapping(mappings);
}
- else
- cctx.mvcc().markExplicitOwner(k, threadId);
-
- if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- cctx.events().addEvent(cctx.affinity().partition(k),
- k,
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- null,
- false,
- CU.subjectId(tx, cctx.shared()),
- null,
- tx == null ? null : tx.resolveTaskName());
+ catch (IgniteCheckedException e) {
+ onDone(e);
}
- i++;
+ onDone(true);
}
+ }
+ }
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ /**
+ *
+ */
+ private void remap() {
+ undoLocks(false, false);
- onDone(true);
- }
+ for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys)
+ cctx.mvcc().removeExplicitLock(threadId, key, lockVer);
+
+ mapOnTopology(true, new Runnable() {
+ @Override public void run() {
+ onDone(true);
+ }
+ });
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 78966d0..1d57ef7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -80,7 +80,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
private IgniteUuid futId = IgniteUuid.randomUuid();
/** Preloader. */
- private GridDhtPreloader<K, V> preloader;
+ private GridDhtPreloader preloader;
/** Trackable flag. */
private boolean trackable;
@@ -95,7 +95,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
GridCacheContext<K, V> cctx,
AffinityTopologyVersion topVer,
Collection<KeyCacheObject> keys,
- GridDhtPreloader<K, V> preloader
+ GridDhtPreloader preloader
) {
assert topVer.topologyVersion() != 0 : topVer;
assert !F.isEmpty(keys) : keys;
@@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
* @return {@code True} if some mapping was added.
*/
private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) {
- Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>();
-
- ClusterNode loc = cctx.localNode();
-
- int curTopVer = topCntr.get();
+ Map<ClusterNode, Set<KeyCacheObject>> mappings = null;
for (KeyCacheObject key : keys)
- map(key, mappings, exc);
+ mappings = map(key, mappings, exc);
if (isDone())
return false;
boolean ret = false;
- if (!mappings.isEmpty()) {
+ if (mappings != null) {
+ ClusterNode loc = cctx.localNode();
+
+ int curTopVer = topCntr.get();
+
preloader.addFuture(this);
trackable = true;
@@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
* @param key Key.
* @param exc Exclude nodes.
* @param mappings Mappings.
+ * @return Mappings.
*/
- private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) {
+ private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key,
+ @Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings,
+ Collection<ClusterNode> exc)
+ {
ClusterNode loc = cctx.localNode();
- int part = cctx.affinity().partition(key);
-
GridCacheEntryEx e = cctx.dht().peekEx(key);
try {
if (e != null && !e.isNewLocked()) {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
+ int part = cctx.affinity().partition(key);
+
log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() +
", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
+ }
// Key has been rebalanced or retrieved already.
- return;
+ return mappings;
}
}
catch (GridCacheEntryRemovedException ignore) {
@@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
", locId=" + cctx.nodeId() + ']');
}
+ int part = cctx.affinity().partition(key);
+
List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) :
new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc)));
@@ -308,7 +315,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
"topVer=" + topVer + ", locId=" + cctx.nodeId() + ']');
// Key is already rebalanced.
- return;
+ return mappings;
}
// Create partition.
@@ -337,9 +344,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" +
key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
- return;
+ return mappings;
}
+ if (mappings == null)
+ mappings = U.newHashMap(keys.size());
+
Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet());
assert mappedKeys != null;
@@ -357,6 +367,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() +
", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']');
}
+
+ return mappings;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 633f237..a6e6c4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
* and populating local cache.
*/
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool<K, V> {
+public class GridDhtPartitionDemandPool {
/** Dummy message to wake up a blocking queue if a node leaves. */
private final SupplyMessage DUMMY_TOP = new SupplyMessage();
/** */
- private final GridCacheContext<K, V> cctx;
+ private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
@@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param cctx Cache context.
* @param busyLock Shutdown lock.
*/
- public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+ public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
assert cctx != null;
assert busyLock != null;
@@ -108,9 +108,11 @@ public class GridDhtPartitionDemandPool<K, V> {
log = cctx.logger(getClass());
- poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+ boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
- if (poolSize > 0) {
+ poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+ if (enabled) {
barrier = new CyclicBarrier(poolSize);
dmdWorkers = new ArrayList<>(poolSize);
@@ -327,7 +329,7 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
*/
- void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) {
+ void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
@@ -399,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> {
private int id;
/** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>();
+ private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
/** Message queue. */
private final LinkedBlockingDeque<SupplyMessage> msgQ =
@@ -425,7 +427,7 @@ public class GridDhtPartitionDemandPool<K, V> {
/**
* @param assigns Assignments.
*/
- void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) {
+ void addAssignments(GridDhtPreloaderAssignments assigns) {
assert assigns != null;
assignQ.offer(assigns);
@@ -885,7 +887,7 @@ public class GridDhtPartitionDemandPool<K, V> {
}
// Sync up all demand threads at this step.
- GridDhtPreloaderAssignments<K, V> assigns = null;
+ GridDhtPreloaderAssignments assigns = null;
while (assigns == null)
assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
@@ -995,12 +997,12 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param exchFut Exchange future.
* @return Assignments of partitions to nodes.
*/
- GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+ GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
// No assignments for disabled preloader.
GridDhtPartitionTopology top = cctx.dht().topology();
if (!cctx.rebalanceEnabled())
- return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
int partCnt = cctx.affinity().partitions();
@@ -1009,7 +1011,7 @@ public class GridDhtPartitionDemandPool<K, V> {
"Topology version mismatch [exchId=" + exchFut.exchangeId() +
", topVer=" + top.topologyVersion() + ']';
- GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
AffinityTopologyVersion topVer = assigns.topologyVersion();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index facf7e3..faa6cf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -237,7 +237,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
* @return Full string representation.
*/
public String toFullString() {
- return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", super.toString());
+ return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..13cfef3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
* Thread pool for supplying partitions to demanding nodes.
*/
-class GridDhtPartitionSupplyPool<K, V> {
+class GridDhtPartitionSupplyPool {
/** */
- private final GridCacheContext<K, V> cctx;
+ private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
@@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> {
* @param cctx Cache context.
* @param busyLock Shutdown lock.
*/
- GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+ GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
assert cctx != null;
assert busyLock != null;
@@ -83,16 +83,18 @@ class GridDhtPartitionSupplyPool<K, V> {
top = cctx.dht().topology();
- int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+ if (!cctx.kernalContext().clientNode()) {
+ int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
- for (int i = 0; i < poolSize; i++)
- workers.add(new SupplyWorker());
+ for (int i = 0; i < poolSize; i++)
+ workers.add(new SupplyWorker());
- cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processDemandMessage(id, m);
- }
- });
+ cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ processDemandMessage(id, m);
+ }
+ });
+ }
depEnabled = cctx.gridDeploy().enabled();
}
@@ -248,11 +250,6 @@ class GridDhtPartitionSupplyPool<K, V> {
boolean ack = false;
try {
- // Partition map exchange is finished which means that all near transactions with given
- // topology version are committed. We can wait for local locks here as it will not take
- // much time.
- cctx.mvcc().finishLocks(d.topologyVersion()).get();
-
for (int part : d.partitions()) {
GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);