You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/10 13:45:20 UTC
[2/2] ignite git commit: Single update POC for atomic cache.
Single update POC for atomic cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d310860
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d310860
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d310860
Branch: refs/heads/ignite-1843
Commit: 1d31086038a0d8359f4bb1304a8ad8b6d9370a69
Parents: 77a3f64
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 10 15:45:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 10 15:45:02 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAtomicFuture.java | 11 +
.../processors/cache/GridCacheMessage.java | 18 +
.../dht/atomic/GridDhtAtomicCache.java | 460 +++++++-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 401 +++++++
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 13 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 177 ++-
.../GridNearAtomicSingleUpdateFuture.java | 1093 ++++++++++++++++++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 20 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 187 ++-
.../distributed/near/GridNearAtomicCache.java | 78 +-
10 files changed, 2326 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index be35c5c..15f004a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -18,8 +18,11 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
+import java.util.UUID;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
/**
* Update future for atomic cache.
@@ -37,4 +40,12 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
* @return Future keys.
*/
public Collection<?> keys();
+
+ public void map();
+
+ public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse res);
+
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res);
+
+ public void onResult(UUID nodeId);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index bdd2118..45a4b87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -511,6 +511,15 @@ public abstract class GridCacheMessage implements Message {
}
}
+ protected final void prepareMarshalCacheObject(@Nullable CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
+ if (obj != null) {
+ obj.prepareMarshal(ctx.cacheObjectContext());
+
+ if (addDepInfo)
+ prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
+ }
+ }
+
/**
* @param col Collection.
* @param ctx Cache context.
@@ -556,6 +565,15 @@ public abstract class GridCacheMessage implements Message {
}
}
+ protected final void finishUnmarshalCacheObject(@Nullable CacheObject obj,
+ GridCacheContext ctx,
+ ClassLoader ldr)
+ throws IgniteCheckedException
+ {
+ if (obj != null)
+ obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
+ }
+
/**
* @param col Collection.
* @param ctx Context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7f9edb2..da5cb6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -374,10 +375,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate... filter) {
A.notNull(key, "key");
+ A.notNull(val, "val");
- return updateAllAsync0(F0.asMap(key, val),
- null,
- null,
+ return snigleUpdateAllAsync0(key,
+ val,
null,
null,
false,
@@ -776,6 +777,57 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true);
}
+ private IgniteInternalFuture snigleUpdateAllAsync0(
+ K key,
+ V val,
+ @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ final boolean rawRetval,
+ @Nullable final CacheEntryPredicate[] filter,
+ final boolean waitTopFut
+ ) {
+ assert ctx.updatesAllowed();
+
+ if (map != null && keyCheck)
+ validateCacheKeys(map.keySet());
+
+ ctx.checkSecurity(SecurityPermission.CACHE_PUT);
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ UUID subjId = ctx.subjectIdPerCall(null, opCtx);
+
+ int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+
+ final GridNearAtomicSingleUpdateFuture updateFut = new GridNearAtomicSingleUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ invokeMap != null ? TRANSFORM : UPDATE,
+ key,
+ val,
+ invokeArgs,
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ filter,
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+
+ return asyncOp(new CO<IgniteInternalFuture<Object>>() {
+ @Override public IgniteInternalFuture<Object> apply() {
+ updateFut.map();
+
+ return updateFut;
+ }
+ });
+ }
+
+
/**
* Entry point for all public API put/transform methods.
*
@@ -1054,7 +1106,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridNearAtomicUpdateRequest req,
final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
) {
- IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
+ IgniteInternalFuture<Object> forceFut = preldr.request(req.singleUpdate() ? Collections.singleton(req.singleKey()) : req.keys(), req.topologyVersion());
if (forceFut.isDone())
updateAllAsyncInternal0(nodeId, req, completionCb);
@@ -1082,11 +1134,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
- List<KeyCacheObject> keys = req.keys();
+ boolean single = req.singleUpdate();
- assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
+ assert !req.returnValue() || (req.operation() == TRANSFORM || single || req.keys().size() == 1);
- GridDhtAtomicUpdateFuture dhtFut = null;
+ GridCacheAtomicFuture dhtFut = null;
boolean remap = false;
@@ -1097,7 +1149,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
// If batch store update is enabled, we need to lock all entries.
// First, need to acquire locks on cache entries, then check filter.
- List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+ List<GridDhtCacheEntry> locked = null;
+ GridDhtCacheEntry singleLocked = null;
+
+ if (req.singleUpdate())
+ singleLocked = lockEntry(req.singleKey(), req.topologyVersion());
+ else
+ locked = lockEntries(req.keys(), req.topologyVersion());
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1106,8 +1164,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (topology().stopping()) {
- res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " +
- "(cache is stopped): " + name()));
+ res.addFailedKeys(single ? Collections.singleton(req.singleKey()) : req.keys(),
+ new IgniteCheckedException("Failed to perform cache operation " +
+ "(cache is stopped): " + name()));
completionCb.apply(req, res);
@@ -1152,7 +1211,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheReturn retVal = null;
- if (keys.size() > 1 && // Several keys ...
+ if (single) {
+ UpdateSingleResult updRes = updateSingleEntry(node,
+ hasNear,
+ req,
+ res,
+ singleLocked,
+ ver,
+ (GridDhtAtomicSingleUpdateFuture)dhtFut,
+ completionCb,
+ ctx.isDrEnabled(),
+ taskName,
+ expiry);
+
+ retVal = updRes.returnValue();
+ deleted = updRes.deleted();
+ dhtFut = updRes.dhtFuture();
+ }
+ else if (req.keys().size() > 1 && // Several keys ...
writeThrough() && !req.skipStore() && // and store is enabled ...
!ctx.store().isLocal() && // and this is not local store ...
!ctx.dr().receiveEnabled() // and no DR.
@@ -1164,7 +1240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res,
locked,
ver,
- dhtFut,
+ (GridDhtAtomicUpdateFuture)dhtFut,
completionCb,
ctx.isDrEnabled(),
taskName,
@@ -1183,7 +1259,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res,
locked,
ver,
- dhtFut,
+ (GridDhtAtomicUpdateFuture)dhtFut,
completionCb,
ctx.isDrEnabled(),
taskName,
@@ -1216,9 +1292,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
e.printStackTrace();
}
finally {
- if (locked != null)
- unlockEntries(locked, req.topologyVersion());
+ if (single) {
+ if (singleLocked != null)
+ unlockSingleEntry(singleLocked, req.topologyVersion());
+ }
+ else {
+ if (locked != null)
+ unlockEntries(locked, req.topologyVersion());
+ }
+ // TODO
// Enqueue if necessary after locks release.
if (deleted != null) {
assert !deleted.isEmpty();
@@ -1242,7 +1325,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// an attempt to use cleaned resources.
U.error(log, "Unexpected exception during cache update", e);
- res.addFailedKeys(keys, e);
+ res.addFailedKeys(single ? Collections.singleton(req.singleKey()) : req.keys(), e);
completionCb.apply(req, res);
@@ -1252,7 +1335,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (remap) {
assert dhtFut == null;
- res.remapKeys(req.keys());
+ res.remapKeys(single ? Collections.singletonList(req.singleKey()) : req.keys());
completionCb.apply(req, res);
}
@@ -1777,7 +1860,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
taskName);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
- dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+ dhtFut = (GridDhtAtomicUpdateFuture)createDhtFuture(ver, req, res, completionCb, true);
readersOnly = true;
}
@@ -1889,6 +1972,187 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * Updates locked entries one-by-one.
+ *
+ * @param node Originating node.
+ * @param hasNear {@code True} if originating node has near cache.
+ * @param req Update request.
+ * @param res Update response.
+ * @param entry Locked entry.
+ * @param ver Assigned update version.
+ * @param dhtFut Optional DHT future.
+ * @param completionCb Completion callback to invoke when DHT future is completed.
+ * @param replicate Whether DR is enabled for that cache.
+ * @param taskName Task name.
+ * @param expiry Expiry policy.
+ * @return Return value.
+ * @throws GridCacheEntryRemovedException Should be never thrown.
+ */
+ private UpdateSingleResult updateSingleEntry(
+ ClusterNode node,
+ boolean hasNear,
+ GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateResponse res,
+ GridDhtCacheEntry entry,
+ GridCacheVersion ver,
+ @Nullable GridDhtAtomicSingleUpdateFuture dhtFut,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ boolean replicate,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiry
+ ) throws GridCacheEntryRemovedException {
+ GridCacheReturn retVal = null;
+ Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+
+ KeyCacheObject k = req.singleKey();
+
+ AffinityTopologyVersion topVer = req.topologyVersion();
+
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+
+ boolean readersOnly = false;
+
+ boolean intercept = ctx.config().getInterceptor() != null;
+
+ GridCacheOperation op = req.operation();
+
+ // We are holding java-level locks on entries at this point.
+ // No GridCacheEntryRemovedException can be thrown.
+ try {
+ if (entry == null)
+ return new UpdateSingleResult(retVal, deleted, dhtFut);;
+
+ boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
+ req.topologyVersion());
+
+ Object writeVal = op == TRANSFORM ? req.singleEntryProcessor() : req.singleWriteValue();
+
+ Collection<UUID> readers = null;
+ Collection<UUID> filteredReaders = null;
+
+ if (checkReaders) {
+ readers = entry.readers();
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ }
+
+ GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+ ver,
+ node.id(),
+ locNodeId,
+ op,
+ writeVal,
+ req.invokeArguments(),
+ primary && writeThrough() && !req.skipStore(),
+ !req.skipStore(),
+ req.returnValue(),
+ expiry,
+ true,
+ true,
+ primary,
+ ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+ topVer,
+ req.filter(),
+ replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+ CU.TTL_NOT_CHANGED,
+ CU.EXPIRE_TIME_CALCULATE,
+ null,
+ true,
+ intercept,
+ req.subjectId(),
+ taskName);
+
+ if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+ dhtFut = (GridDhtAtomicSingleUpdateFuture)createDhtFuture(ver, req, res, completionCb, true);
+
+ readersOnly = true;
+ }
+
+ if (dhtFut != null) {
+ if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
+ assert updRes.conflictResolveResult() == null : updRes;
+
+ if (!readersOnly) {
+ dhtFut.addWriteEntry(entry,
+ updRes.newValue(),
+ updRes.newTtl());
+ }
+
+ if (!F.isEmpty(filteredReaders))
+ dhtFut.addNearWriteEntries(filteredReaders,
+ entry,
+ updRes.newValue(),
+ updRes.newTtl());
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
+ "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
+ }
+ }
+
+ if (hasNear) {
+ if (primary && updRes.sendToDht()) {
+ if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
+ // If put the same value as in request then do not need to send it back.
+ if (op == TRANSFORM || writeVal != updRes.newValue()) {
+ res.addNearValue(0,
+ updRes.newValue(),
+ updRes.newTtl(),
+ updRes.conflictExpireTime());
+ }
+ else
+ res.addNearTtl(0, updRes.newTtl(), updRes.conflictExpireTime());
+
+ if (updRes.newValue() != null) {
+ IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+
+ assert f == null : f;
+ }
+ }
+ else if (F.contains(readers, node.id())) // Reader became primary or backup.
+ entry.removeReader(node.id(), req.messageId());
+ else
+ res.addSkippedIndex(0);
+ }
+ else
+ res.addSkippedIndex(0);
+ }
+
+ if (updRes.removeVersion() != null)
+ deleted = Collections.singleton(F.t(entry, updRes.removeVersion()));
+
+ if (op == TRANSFORM) {
+ assert !req.returnValue();
+
+ IgniteBiTuple<Object, Exception> compRes = updRes.computedResult();
+
+ if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
+ retVal = new GridCacheReturn(node.isLocal());
+
+ retVal.addEntryProcessResult(ctx,
+ k,
+ null,
+ compRes.get1(),
+ compRes.get2());
+ }
+ }
+ else {
+ CacheObject ret = updRes.oldValue();
+
+ retVal = new GridCacheReturn(ctx,
+ node.isLocal(),
+ req.returnValue() ? ret : null,
+ updRes.success());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKey(k, e);
+ }
+
+ return new UpdateSingleResult(retVal, deleted, dhtFut);
+ }
+
+ /**
* @param hasNear {@code True} if originating node has near cache.
* @param firstEntryIdx Index of the first entry in the request keys collection.
* @param entries Entries to update.
@@ -2067,7 +2331,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
batchRes.addDeleted(entry, updRes, entries);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
- dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+ dhtFut = (GridDhtAtomicUpdateFuture)createDhtFuture(ver, req, res, completionCb, true);
batchRes.readersOnly(true);
}
@@ -2139,6 +2403,29 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return dhtFut;
}
+ private GridDhtCacheEntry lockEntry(KeyCacheObject key, AffinityTopologyVersion topVer)
+ throws GridDhtInvalidPartitionException {
+ while (true) {
+ try {
+ GridDhtCacheEntry entry = entryExx(key, topVer);
+
+ UNSAFE.monitorEnter(entry);
+
+ if (entry.obsolete())
+ UNSAFE.monitorExit(entry);
+ else
+ return entry;
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ // Ignore invalid partition exception in CLOCK ordering mode.
+ if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+ return null;
+ else
+ throw e;
+ }
+ }
+ }
+
/**
* Acquires java-level locks on cache entries. Returns collection of locked entries.
*
@@ -2226,6 +2513,30 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
+ private void unlockSingleEntry(GridDhtCacheEntry entry, AffinityTopologyVersion topVer) {
+ // Process deleted entries before locks release.
+ assert ctx.deferredDelete(this) : this;
+
+ boolean skip = false;
+
+ try {
+ if (entry.deleted())
+ skip = true;
+ }
+ finally {
+ UNSAFE.monitorExit(entry);
+ }
+
+ entry.onUnlock();
+
+ if (skip)
+ return;
+
+ // Must touch all entries since update may have deleted entries.
+ // Eviction manager will remove empty entries.
+ ctx.evicts().touch(entry, topVer);
+ }
+
/**
* Releases java-level locks on cache entries.
*
@@ -2376,7 +2687,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param force If {@code true} then creates future without optimizations checks.
* @return Backup update future or {@code null} if there are no backups.
*/
- @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
+ @Nullable private GridCacheAtomicFuture createDhtFuture(
GridCacheVersion writeVer,
GridNearAtomicUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes,
@@ -2404,7 +2715,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
- return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+ return updateReq.singleUpdate() ?
+ new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes) :
+ new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
}
/**
@@ -2452,7 +2765,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.nodeId(ctx.localNodeId());
- GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+ GridCacheAtomicFuture fut = (GridCacheAtomicFuture)ctx.mvcc().atomicFuture(res.futureVersion());
if (fut != null)
fut.onResult(nodeId, res);
@@ -2481,8 +2794,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject key = req.key(i);
+ if (req.singleUpdate()) {
+ KeyCacheObject key = req.singleKey();
try {
while (true) {
@@ -2491,22 +2804,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
entry = entryExx(key);
- CacheObject val = req.value(i);
- EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+ CacheObject val = req.singleValue();
- GridCacheOperation op = entryProcessor != null ? TRANSFORM :
- (val != null) ? UPDATE : DELETE;
+ GridCacheOperation op = (val != null) ? UPDATE : DELETE;
- long ttl = req.ttl(i);
- long expireTime = req.conflictExpireTime(i);
+ long ttl = req.ttl(0);
+ long expireTime = req.conflictExpireTime(0);
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
nodeId,
nodeId,
op,
- op == TRANSFORM ? entryProcessor : val,
- op == TRANSFORM ? req.invokeArguments() : null,
+ val,
+ null,
/*write-through*/false,
/*read-through*/false,
/*retval*/false,
@@ -2520,7 +2831,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate ? DR_BACKUP : DR_NONE,
ttl,
expireTime,
- req.conflictVersion(i),
+ null,
false,
intercept,
req.subjectId(),
@@ -2552,6 +2863,79 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
}
}
+ else {
+ for (int i = 0; i < req.size(); i++) {
+ KeyCacheObject key = req.key(i);
+
+ try {
+ while (true) {
+ GridDhtCacheEntry entry = null;
+
+ try {
+ entry = entryExx(key);
+
+ CacheObject val = req.value(i);
+ EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+
+ GridCacheOperation op = entryProcessor != null ? TRANSFORM :
+ (val != null) ? UPDATE : DELETE;
+
+ long ttl = req.ttl(i);
+ long expireTime = req.conflictExpireTime(i);
+
+ GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+ ver,
+ nodeId,
+ nodeId,
+ op,
+ op == TRANSFORM ? entryProcessor : val,
+ op == TRANSFORM ? req.invokeArguments() : null,
+ /*write-through*/false,
+ /*read-through*/false,
+ /*retval*/false,
+ /*expiry policy*/null,
+ /*event*/true,
+ /*metrics*/true,
+ /*primary*/false,
+ /*check version*/!req.forceTransformBackups(),
+ req.topologyVersion(),
+ CU.empty0(),
+ replicate ? DR_BACKUP : DR_NONE,
+ ttl,
+ expireTime,
+ req.conflictVersion(i),
+ false,
+ intercept,
+ req.subjectId(),
+ taskName);
+
+ if (updRes.removeVersion() != null)
+ ctx.onDeferredDelete(entry, updRes.removeVersion());
+
+ entry.onUnlock();
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry while updating backup value (will retry): " + key);
+
+ entry = null;
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, req.topologyVersion());
+ }
+ }
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ // Ignore.
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+ }
+ }
+ }
if (isNearEnabled(cacheCfg))
((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
@@ -2612,7 +2996,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (log.isDebugEnabled())
log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
- GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().
+ GridCacheAtomicFuture updateFut = (GridCacheAtomicFuture)ctx.mvcc().
atomicFuture(res.futureVersion());
if (updateFut != null)
@@ -2632,7 +3016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
for (GridCacheVersion ver : res.futureVersions()) {
- GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
+ GridCacheAtomicFuture updateFut = (GridCacheAtomicFuture)ctx.mvcc().atomicFuture(ver);
if (updateFut != null)
updateFut.onResult(nodeId);
@@ -2676,7 +3060,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted;
/** */
- private final GridDhtAtomicUpdateFuture dhtFut;
+ private final GridCacheAtomicFuture dhtFut;
/**
* @param retVal Return value.
@@ -2685,7 +3069,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private UpdateSingleResult(GridCacheReturn retVal,
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted,
- GridDhtAtomicUpdateFuture dhtFut) {
+ GridCacheAtomicFuture dhtFut) {
this.retVal = retVal;
this.deleted = deleted;
this.dhtFut = dhtFut;
@@ -2708,7 +3092,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @return DHT future.
*/
- public GridDhtAtomicUpdateFuture dhtFuture() {
+ public GridCacheAtomicFuture dhtFuture() {
return dhtFut;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..f31dda2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * DHT atomic cache backup update future.
+ */
+public class GridDhtAtomicSingleUpdateFuture extends GridFutureAdapter<Void>
+ implements GridCacheAtomicFuture<Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Cache context. */
+ private GridCacheContext cctx;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Write version. */
+ private GridCacheVersion writeVer;
+
+ /** Completion callback. */
+ @GridToStringExclude
+ private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+
+ /** Mappings. */
+ @GridToStringInclude
+ private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+
+ /** Entries with readers. */
+ private GridDhtCacheEntry nearReaderEntry;
+
+ /** Update request. */
+ private GridNearAtomicUpdateRequest updateReq;
+
+ /** Update response. */
+ private GridNearAtomicUpdateResponse updateRes;
+
+ /** */
+ private boolean waitForExchange;
+
+ /**
+ * @param cctx Cache context.
+ * @param completionCb Callback to invoke when future is completed.
+ * @param writeVer Write version.
+ * @param updateReq Update request.
+ * @param updateRes Update response.
+ */
+ public GridDhtAtomicSingleUpdateFuture(
+ GridCacheContext cctx,
+ CI2<GridNearAtomicUpdateRequest,
+ GridNearAtomicUpdateResponse> completionCb,
+ GridCacheVersion writeVer,
+ GridNearAtomicUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes
+ ) {
+ assert updateReq.singleUpdate() : updateReq;
+
+ this.cctx = cctx;
+ this.writeVer = writeVer;
+
+ futVer = cctx.versions().next(updateReq.topologyVersion());
+ this.updateReq = updateReq;
+ this.completionCb = completionCb;
+ this.updateRes = updateRes;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicSingleUpdateFuture.class);
+
+ boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
+
+ waitForExchange = !topLocked;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futVer.asGridUuid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ if (log.isDebugEnabled())
+ log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
+
+ GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
+
+ if (req != null) {
+ // Remove only after added keys to failed set.
+ mappings.remove(nodeId);
+
+ checkComplete();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
+ return this;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<KeyCacheObject> keys() {
+ return Collections.singleton(updateReq.singleKey());
+ }
+
+ /**
+ * @param entry Entry to map.
+ * @param val Value to write.
+ * @param ttl TTL (optional).
+ */
+ public void addWriteEntry(GridDhtCacheEntry entry,
+ @Nullable CacheObject val,
+ long ttl) {
+ AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+ Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
+
+ CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+ for (ClusterNode node : dhtNodes) {
+ UUID nodeId = node.id();
+
+ if (!nodeId.equals(cctx.localNodeId())) {
+ GridDhtAtomicUpdateRequest updateReq = new GridDhtAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ writeVer,
+ syncMode,
+ topVer,
+ false,
+ this.updateReq.subjectId(),
+ this.updateReq.taskNameHash(),
+ null,
+ cctx.deploymentEnabled(),
+ true);
+
+ mappings.put(nodeId, updateReq);
+
+ updateReq.addSingleWriteValue(entry.key(),
+ val,
+ ttl);
+ }
+ }
+ }
+
+ /**
+ * @param readers Entry readers.
+ * @param entry Entry.
+ * @param val Value.
+ * @param ttl TTL for near cache update (optional).
+ */
+ public void addNearWriteEntries(Iterable<UUID> readers,
+ GridDhtCacheEntry entry,
+ @Nullable CacheObject val,
+ long ttl) {
+ CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+ AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+ for (UUID nodeId : readers) {
+ GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+
+
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ // Node left the grid.
+ if (node == null)
+ continue;
+
+ if (updateReq == null) {
+ updateReq = new GridDhtAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ writeVer,
+ syncMode,
+ topVer,
+ false,
+ this.updateReq.subjectId(),
+ this.updateReq.taskNameHash(),
+ null,
+ cctx.deploymentEnabled(),
+ false);
+
+ mappings.put(nodeId, updateReq);
+ }
+
+ nearReaderEntry = entry;
+
+ updateReq.addNearWriteValue(entry.key(),
+ val,
+ null,
+ ttl,
+ -1L);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ cctx.mvcc().removeAtomicFuture(version());
+
+ if (err != null)
+ updateRes.addFailedKey(updateReq.singleKey(), err);
+
+ if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+ completionCb.apply(updateReq, updateRes);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Sends requests to remote nodes.
+ */
+ public void map() {
+ if (!mappings.isEmpty()) {
+ for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ U.warn(log, "Failed to send update request to backup node because it left grid: " +
+ req.nodeId());
+
+ mappings.remove(req.nodeId());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
+ + req.nodeId(), e);
+
+ mappings.remove(req.nodeId());
+ }
+ }
+ }
+
+ checkComplete();
+
+ // Send response right away if no ACKs from backup is required.
+ // Backups will send ACKs anyway, future will be completed after all backups have replied.
+ if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+ completionCb.apply(updateReq, updateRes);
+ }
+
+ /**
+ * Callback for backup update response.
+ *
+ * @param nodeId Backup node ID.
+ * @param updateRes Update response.
+ */
+ public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
+ if (log.isDebugEnabled())
+ log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
+
+ if (updateRes.error() != null)
+ this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error());
+
+ if (!F.isEmpty(updateRes.nearEvicted())) {
+ assert nearReaderEntry != null;
+
+ try {
+ nearReaderEntry.removeReader(nodeId, updateRes.messageId());
+ }
+ catch (GridCacheEntryRemovedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Entry with evicted reader was removed [entry=" + nearReaderEntry + ", err=" + e + ']');
+ }
+ }
+
+ mappings.remove(nodeId);
+
+ checkComplete();
+ }
+
+ @Override
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ assert false;
+
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Deferred update response.
+ *
+ * @param nodeId Backup node ID.
+ */
+ public void onResult(UUID nodeId) {
+ if (log.isDebugEnabled())
+ log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
+
+ mappings.remove(nodeId);
+
+ checkComplete();
+ }
+
+ /**
+ * Checks if all required responses are received.
+ */
+ private void checkComplete() {
+ // Always wait for replies from all backups.
+ if (mappings.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Completing DHT atomic update future: " + this);
+
+ onDone();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4ace5c4..f0635e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -234,7 +234,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
this.updateReq.subjectId(),
this.updateReq.taskNameHash(),
forceTransformBackups ? this.updateReq.invokeArguments() : null,
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled(),
+ false);
mappings.put(nodeId, updateReq);
}
@@ -290,7 +291,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
this.updateReq.subjectId(),
this.updateReq.taskNameHash(),
forceTransformBackups ? this.updateReq.invokeArguments() : null,
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled(),
+ false);
mappings.put(nodeId, updateReq);
}
@@ -394,6 +396,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
checkComplete();
}
+ @Override
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ assert false;
+
+ throw new UnsupportedOperationException();
+ }
+
/**
* Deferred update response.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index e55cac9..242c373 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -78,6 +78,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
+ private KeyCacheObject singleKey;
+
+ private CacheObject singleVal;
+
/** Conflict versions. */
@GridDirectCollection(GridCacheVersion.class)
private List<GridCacheVersion> conflictVers;
@@ -172,7 +176,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
UUID subjId,
int taskNameHash,
Object[] invokeArgs,
- boolean addDepInfo
+ boolean addDepInfo,
+ boolean single
) {
assert invokeArgs == null || forceTransformBackups;
@@ -188,16 +193,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
this.invokeArgs = invokeArgs;
this.addDepInfo = addDepInfo;
- keys = new ArrayList<>();
+ if (!single) {
+ keys = new ArrayList<>();
- if (forceTransformBackups) {
- entryProcessors = new ArrayList<>();
- entryProcessorsBytes = new ArrayList<>();
+ if (forceTransformBackups) {
+ entryProcessors = new ArrayList<>();
+ entryProcessorsBytes = new ArrayList<>();
+ }
+ else
+ vals = new ArrayList<>();
}
- else
- vals = new ArrayList<>();
}
+
+
/**
* @return Force transform backups flag.
*/
@@ -205,6 +214,36 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return forceTransformBackups;
}
+ public KeyCacheObject singleKey() {
+ assert singleKey != null;
+
+ return singleKey;
+ }
+
+ public CacheObject singleValue() {
+ return singleVal;
+ }
+
+ public boolean singleUpdate() {
+ return singleKey != null;
+ }
+
+ public void addSingleWriteValue(KeyCacheObject key,
+ @Nullable CacheObject val,
+ long ttl) {
+ assert !forceTransformBackups;
+
+ singleKey = key;
+ singleVal = val;
+
+ if (ttl >= 0) {
+ if (ttls == null)
+ ttls = new GridLongList(1);
+
+ ttls.add(ttl);
+ }
+ }
+
/**
* @param key Key to add.
* @param val Value, {@code null} if should be removed.
@@ -276,10 +315,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param expireTime Expire time.
*/
public void addNearWriteValue(KeyCacheObject key,
- @Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
- long ttl,
- long expireTime)
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long expireTime)
{
if (nearKeys == null) {
nearKeys = new ArrayList<>();
@@ -540,24 +579,37 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
GridCacheContext cctx = ctx.cacheContext(cacheId);
- prepareMarshalCacheObjects(keys, cctx);
+ if (singleKey != null) {
+ assert !forceTransformBackups;
- prepareMarshalCacheObjects(vals, cctx);
+ prepareMarshalCacheObject(singleKey, cctx);
- prepareMarshalCacheObjects(nearKeys, cctx);
+ prepareMarshalCacheObject(singleVal, cctx);
- prepareMarshalCacheObjects(nearVals, cctx);
+ prepareMarshalCacheObjects(nearKeys, cctx);
- if (forceTransformBackups) {
- // force addition of deployment info for entry processors if P2P is enabled globally.
- if (!addDepInfo && ctx.deploymentEnabled())
- addDepInfo = true;
+ prepareMarshalCacheObjects(nearVals, cctx);
+ }
+ else {
+ prepareMarshalCacheObjects(keys, cctx);
+
+ prepareMarshalCacheObjects(vals, cctx);
+
+ prepareMarshalCacheObjects(nearKeys, cctx);
+
+ prepareMarshalCacheObjects(nearVals, cctx);
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ if (forceTransformBackups) {
+ // force addition of deployment info for entry processors if P2P is enabled globally.
+ if (!addDepInfo && ctx.deploymentEnabled())
+ addDepInfo = true;
+
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
- nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+ nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+ }
}
}
@@ -567,22 +619,29 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
GridCacheContext cctx = ctx.cacheContext(cacheId);
- finishUnmarshalCacheObjects(keys, cctx, ldr);
+ if (singleKey != null) {
+ finishUnmarshalCacheObject(singleKey, cctx, ldr);
- finishUnmarshalCacheObjects(vals, cctx, ldr);
+ finishUnmarshalCacheObject(singleVal, cctx, ldr);
+ }
+ else {
+ finishUnmarshalCacheObjects(keys, cctx, ldr);
- finishUnmarshalCacheObjects(nearKeys, cctx, ldr);
+ finishUnmarshalCacheObjects(vals, cctx, ldr);
- finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+ finishUnmarshalCacheObjects(nearKeys, cctx, ldr);
- if (forceTransformBackups) {
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+ finishUnmarshalCacheObjects(nearVals, cctx, ldr);
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- }
+ if (forceTransformBackups) {
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+ }
- if (forceTransformBackups)
- nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+ if (forceTransformBackups)
+ nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+ }
}
/** {@inheritDoc} */
@@ -684,42 +743,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
writer.incrementState();
case 16:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeMessage("singleKey", singleKey))
return false;
writer.incrementState();
case 17:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeMessage("singleVal", singleVal))
return false;
writer.incrementState();
case 18:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 20:
- if (!writer.writeMessage("ttls", ttls))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 21:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 22:
+ if (!writer.writeMessage("ttls", ttls))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -846,7 +917,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 16:
- subjId = reader.readUuid("subjId");
+ singleKey = reader.readMessage("singleKey");
if (!reader.isLastRead())
return false;
@@ -854,6 +925,22 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 17:
+ singleVal = reader.readMessage("singleVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -865,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 18:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -873,7 +960,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 19:
+ case 21:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -881,7 +968,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 20:
+ case 22:
ttls = reader.readMessage("ttls");
if (!reader.isLastRead())
@@ -889,7 +976,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 21:
+ case 23:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -897,7 +984,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 22:
+ case 24:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -917,7 +1004,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 23;
+ return 25;
}
/** {@inheritDoc} */