You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/17 13:07:58 UTC
[11/12] incubator-ignite git commit: # send previous value for atomic
cache updates on unstable topology
# send previous value for atomic cache updates on unstable topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f0b24c47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f0b24c47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f0b24c47
Branch: refs/heads/ignite-426
Commit: f0b24c47a6b5a449a63ca5cf8fdc85811b4cc278
Parents: 4c634ed
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 17 10:22:14 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 17 13:54:16 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 5 +-
.../processors/cache/GridCacheMapEntry.java | 29 +-
.../dht/GridClientPartitionTopology.java | 7 +
.../dht/GridDhtPartitionTopology.java | 6 +
.../dht/GridDhtPartitionTopologyImpl.java | 45 ++-
.../dht/atomic/GridDhtAtomicCache.java | 63 +++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 69 ++++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 5 +-
.../distributed/near/GridNearAtomicCache.java | 6 +-
.../processors/cache/GridCacheTestEntryEx.java | 3 +-
...acheContinuousQueryFailoverAbstractTest.java | 271 ++++++++++++++++---
12 files changed, 432 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 1b5a717..88ebd48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -407,6 +407,7 @@ public interface GridCacheEntryEx {
* @param primary If update is performed on primary node (the one which assigns version).
* @param checkVer Whether update should check current version and ignore update if current version is
* greater than passed in.
+ * @param topVer Topology version.
* @param filter Optional filter to check.
* @param drType DR type.
* @param conflictTtl Conflict TTL (if any).
@@ -416,6 +417,7 @@ public interface GridCacheEntryEx {
* @param intercept If {@code true} then calls cache interceptor.
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
+ * @param prevVal Previous value.
* @return Tuple where first value is flag showing whether operation succeeded,
* second value is old entry value if return value is requested, third is updated entry value,
* fourth is the version to enqueue for deferred delete the fifth is DR conflict context
@@ -448,7 +450,8 @@ public interface GridCacheEntryEx {
boolean conflictResolve,
boolean intercept,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable CacheObject prevVal
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 283b0b4..e3b25df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1601,7 +1601,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean conflictResolve,
boolean intercept,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable CacheObject prevVal
) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
assert cctx.atomic();
@@ -1783,6 +1784,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
"[entry=" + this + ", newVer=" + newVer + ']');
}
+ if (!cctx.isNear()) {
+ CacheObject evtVal;
+
+ if (op == GridCacheOperation.TRANSFORM) {
+ EntryProcessor<Object, Object, ?> entryProcessor =
+ (EntryProcessor<Object, Object, ?>)writeObj;
+
+ CacheInvokeEntry<Object, Object> entry =
+ new CacheInvokeEntry<>(cctx, key, prevVal, version());
+
+ try {
+ entryProcessor.process(entry, invokeArgs);
+
+ evtVal = entry.modified() ?
+ cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+ }
+ catch (Exception e) {
+ evtVal = prevVal;
+ }
+ }
+ else
+ evtVal = (CacheObject)writeObj;
+
+ cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false, topVer);
+ }
+
return new GridCacheUpdateAtomicResult(false,
retval ? rawGetOrUnmarshalUnlocked(false) : null,
null,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5473348..8a7576a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -824,6 +824,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 2d9771f..9933444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -228,4 +228,10 @@ public interface GridDhtPartitionTopology {
* @param threshold Threshold for number of entries.
*/
public void printMemoryStats(int threshold);
+
+ /**
+ * @param topVer Topology version.
+ * @return {@code True} if rebalance process finished.
+ */
+ public boolean rebalanceFinished(AffinityTopologyVersion topVer);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 93f085c..1c71ff1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -85,6 +85,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** Partition update counter. */
private Map<Integer, Long> cntrMap = new HashMap<>();
+ /** */
+ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
+
/**
* @param cctx Context.
*/
@@ -114,6 +117,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
topReadyFut = null;
topVer = AffinityTopologyVersion.NONE;
+
+ rebalancedTopVer = AffinityTopologyVersion.NONE;
}
finally {
lock.writeLock().unlock();
@@ -203,6 +208,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
updateSeq.setIfGreater(updSeq);
topReadyFut = exchFut;
+
+ rebalancedTopVer = AffinityTopologyVersion.NONE;;
}
finally {
lock.writeLock().unlock();
@@ -508,6 +515,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ updateRebalanceVersion();
+
consistencyCheck();
}
finally {
@@ -690,7 +699,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param states Additional partition states.
* @return List of nodes for the partition.
*/
- private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+ private List<ClusterNode> nodes(int p,
+ AffinityTopologyVersion topVer,
+ GridDhtPartitionState state,
+ GridDhtPartitionState... states) {
Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
lock.readLock().lock();
@@ -888,6 +900,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean changed = checkEvictions(updateSeq);
+ updateRebalanceVersion();
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -1000,6 +1014,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
changed |= checkEvictions(updateSeq);
+ updateRebalanceVersion();
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -1196,6 +1212,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part.own()) {
updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+ updateRebalanceVersion();
+
consistencyCheck();
return true;
@@ -1268,6 +1286,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+ return topVer.equals(rebalancedTopVer);
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
@@ -1280,6 +1303,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
+ *
+ */
+ private void updateRebalanceVersion() {
+ if (!rebalancedTopVer.equals(topVer)) {
+ for (int i = 0; i < cctx.affinity().partitions(); i++) {
+ List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+ List<ClusterNode> owners = owners(i);
+
+ if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
+ return;
+ }
+
+ rebalancedTopVer = topVer;
+
+ if (log.isDebugEnabled())
+ log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+ }
+ }
+
+ /**
* @param p Partition.
* @param nodeId Node ID.
* @param match State to match.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index eb4d51c..6c05bfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1057,7 +1057,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
try {
- topology().readLock();
+ GridDhtPartitionTopology top = topology();
+
+ top.readLock();
try {
if (topology().stopping()) {
@@ -1074,7 +1076,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
- !needRemap(req.topologyVersion(), topology().topologyVersion())) {
+ !needRemap(req.topologyVersion(), top.topologyVersion())) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
@@ -1089,7 +1091,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (ver == null) {
// Assign next version for update inside entries lock.
- ver = ctx.versions().next(topology().topologyVersion());
+ ver = ctx.versions().next(top.topologyVersion());
if (hasNear)
res.nearVersion(ver);
@@ -1101,6 +1103,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
log.debug("Using cache version for update request on primary node [ver=" + ver +
", req=" + req + ']');
+ boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
dhtFut = createDhtFuture(ver, req, res, completionCb, false);
expiry = expiryPolicy(req.expiry());
@@ -1123,7 +1127,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb,
ctx.isDrEnabled(),
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
@@ -1142,7 +1147,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb,
ctx.isDrEnabled(),
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
retVal = updRes.returnValue();
deleted = updRes.deleted();
@@ -1162,7 +1168,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
remap = true;
}
finally {
- topology().readUnlock();
+ top.readUnlock();
}
}
catch (GridCacheEntryRemovedException e) {
@@ -1245,6 +1251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param replicate Whether replication is enabled.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Deleted entries.
* @throws GridCacheEntryRemovedException Should not be thrown.
*/
@@ -1260,7 +1267,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) throws GridCacheEntryRemovedException {
assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1407,7 +1415,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
firstEntryIdx = i;
@@ -1455,7 +1464,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
firstEntryIdx = i;
@@ -1574,7 +1584,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
}
else
assert filtered.isEmpty();
@@ -1650,6 +1661,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param replicate Whether DR is enabled for that cache.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
@@ -1664,7 +1676,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) throws GridCacheEntryRemovedException {
GridCacheReturn retVal = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1721,7 +1734,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.invokeArguments(),
primary && writeThrough() && !req.skipStore(),
!req.skipStore(),
- req.returnValue(),
+ sndPrevVal || req.returnValue(),
expiry,
true,
true,
@@ -1736,7 +1749,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
intercept,
req.subjectId(),
- taskName);
+ taskName,
+ null);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -1759,7 +1773,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op == TRANSFORM ? req.entryProcessor(i) : null,
updRes.newTtl(),
updRes.conflictExpireTime(),
- newConflictVer);
+ newConflictVer,
+ sndPrevVal,
+ updRes.oldValue());
}
if (!F.isEmpty(filteredReaders))
@@ -1865,6 +1881,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param batchRes Batch update result.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Deleted entries.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
@@ -1885,7 +1902,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean replicate,
UpdateBatchResult batchRes,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) {
assert putMap == null ^ rmvKeys == null;
@@ -1987,7 +2005,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/false,
+ /*retval*/sndPrevVal,
expiry,
/*event*/true,
/*metrics*/true,
@@ -2002,7 +2020,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*conflict resolve*/false,
/*intercept*/false,
req.subjectId(),
- taskName);
+ taskName,
+ null);
assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
"success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2038,7 +2057,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()),
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE,
- null);
+ null,
+ sndPrevVal,
+ updRes.oldValue());
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(filteredReaders,
@@ -2423,7 +2444,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
processDhtAtomicUpdateRequest0(nodeId, req);
else {
fut.listen(new CI1<IgniteInternalFuture>() {
- @Override public void apply(IgniteInternalFuture future) {
+ @Override public void apply(IgniteInternalFuture fut) {
processDhtAtomicUpdateRequest0(nodeId, req);
}
});
@@ -2461,6 +2482,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entry = entryExx(key);
CacheObject val = req.value(i);
+ CacheObject prevVal = req.previousValue(i);
EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
GridCacheOperation op = entryProcessor != null ? TRANSFORM :
@@ -2493,7 +2515,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
intercept,
req.subjectId(),
- taskName);
+ taskName,
+ prevVal);
if (updRes.removeVersion() != null) {
if (ctx.deferredDelete())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 601f1d8..d983e88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -210,7 +210,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ @Nullable CacheObject prevVal) {
AffinityTopologyVersion topVer = updateReq.topologyVersion();
int part = entry.partition();
@@ -254,7 +256,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
entryProcessor,
ttl,
conflictExpireTime,
- conflictVer);
+ conflictVer,
+ addPrevVal,
+ prevVal);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 6340c93..2f92fde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -71,6 +71,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
+ /** Previous values. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> prevVals;
+
/** Conflict versions. */
@GridDirectCollection(GridCacheVersion.class)
private List<GridCacheVersion> conflictVers;
@@ -208,13 +213,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} adds previous value.
+ * @param prevVal Previous value.
*/
public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ @Nullable CacheObject prevVal) {
keys.add(key);
if (forceTransformBackups) {
@@ -225,6 +234,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
else
vals.add(val);
+ if (addPrevVal) {
+ if (prevVals == null)
+ prevVals = new ArrayList<>();
+
+ prevVals.add(prevVal);
+ }
+
// In case there is no conflict, do not create the list.
if (conflictVer != null) {
if (conflictVers == null) {
@@ -419,6 +435,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/**
* @param idx Key index.
+ * @return Value.
+ */
+ @Nullable public CacheObject previousValue(int idx) {
+ if (prevVals != null)
+ return prevVals.get(idx);
+
+ return null;
+ }
+
+ /**
+ * @param idx Key index.
* @return Entry processor.
*/
@Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -670,42 +697,48 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
writer.incrementState();
case 17:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 18:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 20:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 21:
- if (!writer.writeMessage("ttls", ttls))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 22:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("ttls", ttls))
return false;
writer.incrementState();
case 23:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -840,7 +873,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 17:
- subjId = reader.readUuid("subjId");
+ prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -848,6 +881,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 18:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -859,7 +900,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 19:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -867,7 +908,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 20:
+ case 21:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -875,7 +916,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 21:
+ case 22:
ttls = reader.readMessage("ttls");
if (!reader.isLastRead())
@@ -883,7 +924,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 22:
+ case 23:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -891,7 +932,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 23:
+ case 24:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -911,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 24;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 66f0300..eec7fa0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -129,7 +129,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
private final boolean rawRetval;
/** Fast map flag. */
- private final boolean fastMap;
+ private boolean fastMap;
/** */
private boolean fastMapRemap;
@@ -696,6 +696,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return;
}
+ if (fastMap && futVer == null)
+ fastMap = cctx.topology().rebalanceFinished(topVer);
+
if (futVer == null)
// Assign future version in topology read lock before first exception may be thrown.
futVer = cctx.versions().next(topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2255988..cf68d24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -239,7 +239,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
false,
false,
subjId,
- taskName);
+ taskName,
+ null);
if (updRes.removeVersion() != null) {
if (ctx.deferredDelete())
@@ -341,7 +342,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
false,
/*intercept*/false,
req.subjectId(),
- taskName);
+ taskName,
+ null);
if (updRes.removeVersion() != null) {
if (ctx.deferredDelete())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 0055557..0dd10ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -503,7 +503,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
boolean conflictResolve,
boolean intercept,
UUID subjId,
- String taskName) throws IgniteCheckedException,
+ String taskName,
+ @Nullable CacheObject prevVal) throws IgniteCheckedException,
GridCacheEntryRemovedException {
assert false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f0b24c47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index e6f3bd7..151ae33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -25,9 +25,12 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.continuous.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
@@ -135,8 +138,91 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
/**
* @throws Exception If failed.
*/
+ public void testRebalanceVersion() throws Exception {
+ Ignite ignite0 = startGrid(0);
+ GridDhtPartitionTopology top0 = ((IgniteKernal)ignite0).context().cache().context().cacheContext(1).topology();
+
+ assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(1)));
+ assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(2)));
+
+ Ignite ignite1 = startGrid(1);
+ GridDhtPartitionTopology top1 = ((IgniteKernal)ignite1).context().cache().context().cacheContext(1).topology();
+
+ waitRebalanceFinished(ignite0, 2);
+ waitRebalanceFinished(ignite1, 2);
+
+ assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(3)));
+ assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(3)));
+
+ Ignite ignite2 = startGrid(2);
+ GridDhtPartitionTopology top2 = ((IgniteKernal)ignite2).context().cache().context().cacheContext(1).topology();
+
+ waitRebalanceFinished(ignite0, 3);
+ waitRebalanceFinished(ignite1, 3);
+ waitRebalanceFinished(ignite2, 3);
+
+ assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertFalse(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+ client = true;
+
+ Ignite ignite3 = startGrid(3);
+ GridDhtPartitionTopology top3 = ((IgniteKernal)ignite3).context().cache().context().cacheContext(1).topology();
+
+ assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertTrue(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertTrue(top3.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+ stopGrid(1);
+
+ waitRebalanceFinished(ignite0, 5);
+ waitRebalanceFinished(ignite2, 5);
+ waitRebalanceFinished(ignite3, 5);
+
+ stopGrid(3);
+
+ assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(6)));
+ assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(6)));
+
+ stopGrid(0);
+
+ waitRebalanceFinished(ignite2, 7);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param topVer Topology version.
+ * @throws Exception If failed.
+ */
+ private void waitRebalanceFinished(Ignite ignite, long topVer) throws Exception {
+ final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
+
+ final GridDhtPartitionTopology top =
+ ((IgniteKernal)ignite).context().cache().context().cacheContext(1).topology();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return top.rebalanceFinished(topVer0);
+ }
+ }, 5000);
+
+ assertTrue(top.rebalanceFinished(topVer0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testOneBackup() throws Exception {
- checkBackupQueue(1);
+ checkBackupQueue(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOneBackupClientUpdate() throws Exception {
+ checkBackupQueue(1, true);
}
/**
@@ -146,14 +232,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
if (cacheMode() == REPLICATED)
return;
- checkBackupQueue(3);
+ checkBackupQueue(3, false);
}
/**
* @param backups Number of backups.
+ * @param updateFromClient If {@code true} executes cache update from client node.
* @throws Exception If failed.
*/
- private void checkBackupQueue(int backups) throws Exception {
+ private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception {
this.backups = backups;
final int SRV_NODES = 4;
@@ -183,6 +270,10 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
int PARTS = 10;
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
for (int i = 0; i < SRV_NODES - 1; i++) {
log.info("Stop iteration: " + i);
@@ -203,7 +294,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
for (Integer key : keys) {
log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
- cache.put(key, key);
+ T2<Object, Object> t = updates.get(key);
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)key, null));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)key, (Object)key));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+ }
+
+ if (updateFromClient)
+ qryClientCache.put(key, key);
+ else
+ cache.put(key, key);
if (first) {
spi.skipMsg = true;
@@ -223,6 +330,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
}
+
+ checkEvents(expEvts, lsnr);
}
for (int i = 0; i < SRV_NODES - 1; i++) {
@@ -241,7 +350,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
for (Integer key : keys) {
log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
- cache.put(key, key);
+ T2<Object, Object> t = updates.get(key);
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)key, null));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)key, (Object)key));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+ }
+
+ if (updateFromClient)
+ qryClientCache.put(key, key);
+ else
+ cache.put(key, key);
}
if (!latch.await(5, SECONDS)) {
@@ -253,6 +378,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
}
+
+ checkEvents(expEvts, lsnr);
}
cur.close();
@@ -261,6 +388,24 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
}
/**
+ * @param expEvts Expected events.
+ * @param lsnr Listener.
+ */
+ private void checkEvents(List<T3<Object, Object, Object>> expEvts, CacheEventListener1 lsnr) {
+ for (T3<Object, Object, Object> exp : expEvts) {
+ CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
+
+ assertNotNull("No event for key: " + exp.get1(), e);
+ assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+ assertEquals("Unexpected old value: " + e, exp.get3(), e.getOldValue());
+ }
+
+ expEvts.clear();
+
+ lsnr.evts.clear();
+ }
+
+ /**
* @param cache Cache.
* @param parts Number of partitions.
* @return Keys.
@@ -449,7 +594,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
/**
* @throws Exception If failed.
*/
- public void _testFailover() throws Exception {
+ public void testFailover() throws Exception {
final int SRV_NODES = 4;
startGridsMultiThreaded(SRV_NODES);
@@ -483,14 +628,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
startGrid(idx);
- Thread.sleep(2000);
+ Thread.sleep(3000);
log.info("Stop node: " + idx);
stopGrid(idx);
- Thread.sleep(1000);
-
CountDownLatch latch = new CountDownLatch(1);
assertTrue(checkLatch.compareAndSet(null, latch));
@@ -508,6 +651,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final Map<Integer, Integer> vals = new HashMap<>();
+ final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
+
try {
long stopTime = System.currentTimeMillis() + 3 * 60_000;
@@ -518,6 +663,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
while (System.currentTimeMillis() < stopTime) {
Integer key = rnd.nextInt(PARTS);
+ Integer prevVal = vals.get(key);
Integer val = vals.get(key);
if (val == null)
@@ -529,6 +675,16 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
vals.put(key, val);
+ List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
+
+ if (keyEvts == null) {
+ keyEvts = new ArrayList<>();
+
+ expEvts.put(key, keyEvts);
+ }
+
+ keyEvts.add(new T2<>(val, prevVal));
+
CountDownLatch latch = checkLatch.get();
if (latch != null) {
@@ -544,12 +700,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return checkEvents(false, vals, lsnr);
+ return checkEvents(false, expEvts, lsnr);
}
}, 10_000);
if (!check)
- assertTrue(checkEvents(true, vals, lsnr));
+ assertTrue(checkEvents(true, expEvts, lsnr));
success = true;
@@ -577,12 +733,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return checkEvents(false, vals, lsnr);
+ return checkEvents(false, expEvts, lsnr);
}
}, 10_000);
if (!check)
- assertTrue(checkEvents(true, vals, lsnr));
+ assertTrue(checkEvents(true, expEvts, lsnr));
cur.close();
@@ -591,47 +747,64 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
/**
* @param logAll If {@code true} logs all unexpected values.
- * @param vals Expected values.
+ * @param expEvts Expected values.
* @param lsnr Listener.
* @return Check status.
*/
- private boolean checkEvents(boolean logAll, Map<Integer, Integer> vals, CacheEventListener2 lsnr) {
- assertTrue(!vals.isEmpty());
-
- ConcurrentHashMap<Integer, Integer> lsnrVals = lsnr.vals;
-
- ConcurrentHashMap<Integer, Integer> lsnrCntrs = lsnr.cntrs;
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private boolean checkEvents(boolean logAll,
+ Map<Integer, List<T2<Integer, Integer>>> expEvts,
+ CacheEventListener2 lsnr) {
+ assertTrue(!expEvts.isEmpty());
boolean pass = true;
- for (Map.Entry<Integer, Integer> e : vals.entrySet()) {
+ for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) {
Integer key = e.getKey();
+ List<T2<Integer, Integer>> exp = e.getValue();
- Integer lsnrVal = lsnrVals.get(key);
- Integer expVal = e.getValue();
+ List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key);
- if (!expVal.equals(lsnrVal)) {
+ if (rcvdEvts == null) {
pass = false;
- log.info("Unexpected value [key=" + key + ", val=" + lsnrVal + ", expVal=" + expVal + ']');
+ log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']');
if (!logAll)
return false;
}
+ else {
+ synchronized (rcvdEvts) {
+ if (rcvdEvts.size() != exp.size()) {
+ pass = false;
- Integer lsnrCntr = lsnrCntrs.get(key);
- Integer expCntr = expVal + 1;
+ log.info("Missed or extra events for key [key=" + key +
+ ", exp=" + e.getValue() +
+ ", rcvd=" + rcvdEvts + ']');
- if (!expCntr.equals(lsnrCntr)) {
- pass = false;
+ if (!logAll)
+ return false;
+ }
- log.info("Unexpected events count [key=" + key + ", val=" + lsnrCntr + ", expVal=" + expCntr + ']');
+ int cnt = Math.min(rcvdEvts.size(), exp.size());
- if (!logAll)
- return false;
+ for (int i = 0; i < cnt; i++) {
+ T2<Integer, Integer> expEvt = exp.get(i);
+ CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
+
+ assertEquals(key, rcvdEvt.getKey());
+ assertEquals(expEvt.get1(), rcvdEvt.getValue());
+ assertEquals(expEvt.get2(), rcvdEvt.getOldValue());
+ }
+ }
}
}
+ if (pass) {
+ expEvts.clear();
+ lsnr.evts.clear();
+ }
+
return pass;
}
@@ -646,6 +819,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
private GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
/** */
+ private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+ /** */
@LoggerResource
private IgniteLogger log;
@@ -657,6 +833,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
log.info("Received cache event: " + evt + " " + (latch != null ? latch.getCount() : null));
+ this.evts.put(evt.getKey(), evt);
+
keys.add((Integer) evt.getKey());
assertTrue(latch != null);
@@ -691,7 +869,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>();
/** */
- private final ConcurrentHashMap<Integer, Integer> cntrs = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
@@ -706,25 +884,34 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
Integer prevVal = vals.get(key);
+ boolean dup = false;
+
if (prevVal != null) {
- assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
- assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
+ if (prevVal.equals(val)) // Can get this event with automatic put retry.
+ dup = true;
+ else {
+ assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
+ assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
+ }
}
else {
assertEquals("Unexpected event: " + evt, (Object)0, val);
assertNull("Unexpected event: " + evt, evt.getOldValue());
}
- vals.put(key, val);
+ if (!dup) {
+ vals.put(key, val);
- Integer cntr = cntrs.get(key);
+ List<CacheEntryEvent<?, ?>> keyEvts = this.evts.get(key);
- if (cntr == null)
- cntr = 1;
- else
- cntr = cntr + 1;
+ if (keyEvts == null) {
+ keyEvts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
- cntrs.put(key, cntr);
+ this.evts.put(key, keyEvts);
+ }
+
+ keyEvts.add(evt);
+ }
}
}
catch (Throwable e) {