You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/07/08 00:59:14 UTC
incubator-ignite git commit: IGNITE-1103 - Fixing synchronous
evictions.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1103 0f1b31a5b -> 31989a94c
IGNITE-1103 - Fixing synchronous evictions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31989a94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31989a94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31989a94
Branch: refs/heads/ignite-1103
Commit: 31989a94c4548a9a9c8a68324de50df5d7171eb1
Parents: 0f1b31a
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Jul 7 15:59:09 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Jul 7 15:59:09 2015 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 13 +-
.../cache/GridCacheEvictionManager.java | 93 ++++----
.../cache/GridCacheEvictionResponse.java | 32 ++-
.../processors/cache/GridCacheMapEntry.java | 17 +-
.../distributed/dht/GridDhtCacheEntry.java | 6 +-
.../distributed/near/GridNearTxRemote.java | 4 +-
.../processors/cache/GridCacheTestEntryEx.java | 6 +-
...gniteCacheSynchronizedEvictionsSelfTest.java | 214 +++++++++++++++++++
.../dht/GridCacheDhtEntrySelfTest.java | 7 +-
.../testsuites/IgniteCacheTestSuite2.java | 2 +
10 files changed, 334 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 3857b35..9da4dba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -221,8 +221,12 @@ public interface GridCacheEntryEx {
* @return {@code True} if entry could be evicted.
* @throws IgniteCheckedException In case of error.
*/
- public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
- @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException;
+ public boolean evictInternal(
+ boolean swap,
+ GridCacheVersion obsoleteVer,
+ AffinityTopologyVersion topVer,
+ @Nullable CacheEntryPredicate[] filter
+ ) throws IgniteCheckedException;
/**
* Evicts entry when batch evict is performed. When called, does not write entry data to swap, but instead
@@ -232,7 +236,10 @@ public interface GridCacheEntryEx {
* @return Swap entry if this entry was marked obsolete, {@code null} if entry was not evicted.
* @throws IgniteCheckedException If failed.
*/
- public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer) throws IgniteCheckedException;
+ public GridCacheBatchSwapEntry evictInBatchInternal(
+ GridCacheVersion obsoleteVer,
+ AffinityTopologyVersion topVer
+ ) throws IgniteCheckedException;
/**
* This method should be called each time entry is marked obsolete
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index d565af1..1493fb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -147,7 +147,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (!cctx.isLocal()) {
evictSync = cfg.isEvictSynchronized() && !cctx.isNear() && !cctx.isSwapOrOffheapEnabled();
- nearSync = isNearEnabled(cctx) && !cctx.isNear() && cfg.isEvictSynchronized();
+ nearSync = !cctx.isNear() && cfg.isEvictSynchronized();
}
else {
if (cfg.isEvictSynchronized())
@@ -228,22 +228,13 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
* Outputs warnings if potential configuration problems are detected.
*/
private void reportConfigurationProblems() {
- CacheMode mode = cctx.config().getCacheMode();
-
- if (plcEnabled && !cctx.isNear() && mode == PARTITIONED) {
- if (!evictSync) {
+ if (plcEnabled && !cctx.isNear()) {
+ if (!evictSync || !nearSync) {
U.warn(log, "Evictions are not synchronized with other nodes in topology " +
"which provides 2x-3x better performance but may cause data inconsistency if cache store " +
"is not configured (consider changing 'evictSynchronized' configuration property).",
"Evictions are not synchronized for cache: " + cctx.namexx());
}
-
- if (!nearSync && isNearEnabled(cctx)) {
- U.warn(log, "Evictions on primary node are not synchronized with near caches on other nodes " +
- "which provides 2x-3x better performance but may cause data inconsistency (consider changing " +
- "'nearEvictSynchronized' configuration property).",
- "Evictions are not synchronized with near caches on other nodes for cache: " + cctx.namexx());
- }
}
}
@@ -351,8 +342,11 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
log.debug("Topology version is different [locTopVer=" + topVer +
", rmtTopVer=" + req.topologyVersion() + ']');
- sendEvictionResponse(nodeId,
- new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true));
+ GridCacheEvictionResponse res = new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true);
+
+ res.topologyVersion(topVer);
+
+ sendEvictionResponse(nodeId, res);
return;
}
@@ -418,7 +412,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
assert !near;
- boolean evicted = evictLocally(key, ver, near, obsoleteVer);
+ boolean evicted = evictLocally(key, ver, near, obsoleteVer, req.topologyVersion());
if (log.isDebugEnabled())
log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near +
@@ -446,7 +440,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
assert near;
- boolean evicted = evictLocally(key, ver, near, obsoleteVer);
+ boolean evicted = evictLocally(key, ver, near, obsoleteVer, req.topologyVersion());
if (log.isDebugEnabled())
log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near +
@@ -607,8 +601,9 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
private boolean evictLocally(KeyCacheObject key,
final GridCacheVersion ver,
boolean near,
- GridCacheVersion obsoleteVer)
- {
+ GridCacheVersion obsoleteVer,
+ AffinityTopologyVersion topVer
+ ) {
assert key != null;
assert ver != null;
assert obsoleteVer != null;
@@ -631,7 +626,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
// without any consistency risks. We don't use filter in this case.
// If entry should be evicted from DHT cache, we do not compare versions
// as well because versions may change outside the transaction.
- return evict0(cache, entry, obsoleteVer, null, false);
+ return evict0(cache, entry, obsoleteVer, null, false, topVer);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to evict entry on remote node [key=" + key + ", localNode=" + cctx.nodeId() + ']', e);
@@ -654,7 +649,8 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
GridCacheEntryEx entry,
GridCacheVersion obsoleteVer,
@Nullable CacheEntryPredicate[] filter,
- boolean explicit
+ boolean explicit,
+ AffinityTopologyVersion topVer
) throws IgniteCheckedException {
assert cache != null;
assert entry != null;
@@ -666,7 +662,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
boolean hasVal = recordable && entry.hasValue();
- boolean evicted = entry.evictInternal(cctx.isSwapOrOffheapEnabled(), obsoleteVer, filter);
+ boolean evicted = entry.evictInternal(cctx.isSwapOrOffheapEnabled(), obsoleteVer, topVer, filter);
if (evicted) {
// Remove manually evicted entry from policy.
@@ -721,7 +717,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (memoryMode == OFFHEAP_TIERED) {
try {
- evict0(cctx.cache(), e, cctx.versions().next(), null, false);
+ evict0(cctx.cache(), e, cctx.versions().next(), null, false, cctx.affinity().affinityTopologyVersion());
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to evict entry from on heap memory: " + e, ex);
@@ -753,7 +749,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (!cctx.isNear() && memoryMode == OFFHEAP_TIERED) {
try {
- evict0(cctx.cache(), e, cctx.versions().next(), null, false);
+ evict0(cctx.cache(), e, cctx.versions().next(), null, false, topVer);
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to evict entry from on heap memory: " + e, ex);
@@ -865,15 +861,16 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (!cctx.isNear() && !explicit && !firstEvictWarn)
warnFirstEvict();
+ AffinityTopologyVersion topVer = cctx.topology().topologyVersion();
+
if (evictSyncAgr) {
assert !cctx.isNear(); // Make sure cache is not NEAR.
- if (cctx.affinity().backups(
- entry.key(),
- cctx.topology().topologyVersion()).contains(cctx.localNode()) &&
- evictSync)
- // Do not track backups if evicts are synchronized.
- return !explicit;
+ // Do not track backups if evicts are synchronized.
+ if (evictSync) {
+ if (!cctx.affinity().primary(cctx.localNode(), entry.partition(), topVer))
+ return !explicit;
+ }
try {
if (!cctx.isAll(entry, filter))
@@ -898,7 +895,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
// Do not touch entry if not evicted:
// 1. If it is call from policy, policy tracks it on its own.
// 2. If it is explicit call, entry is touched on tx commit.
- return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit);
+ return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit, topVer);
}
return true;
@@ -915,8 +912,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
- Set<GridCacheEntryEx> notRemove = null;
-
Collection<GridCacheBatchSwapEntry> swapped = new ArrayList<>(keys.size());
boolean recordable = cctx.events().isRecordable(EVT_CACHE_ENTRY_EVICTED);
@@ -935,6 +930,8 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
cached.put(k, e);
}
+ Set<GridCacheEntryEx> notRemove = null;
+
try {
for (GridCacheEntryEx entry : cached.values()) {
// Do not evict internal entries.
@@ -956,7 +953,8 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (obsoleteVer == null)
obsoleteVer = cctx.versions().next();
- GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer);
+ GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer,
+ cctx.affinity().affinityTopologyVersion());
if (swapEntry != null) {
swapped.add(swapEntry);
@@ -1194,7 +1192,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
// Do not touch primary entries, if not evicted.
// They will be touched within updating transactions.
- evict0(cctx.cache(), entry, obsoleteVer, versionFilter(info), false);
+ evict0(cctx.cache(), entry, obsoleteVer, versionFilter(info), false, topVer);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to evict entry [entry=" + entry +
@@ -1421,13 +1419,25 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
ClusterNode loc = cctx.localNode();
+ cctx.awaitStarted();
+
+ AffinityTopologyVersion startTopVer = cctx.startTopologyVersion();
+
+ if (startTopVer == null)
+ startTopVer = cctx.affinity().affinityTopologyVersion();
+
// Initialize.
primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(),
- cctx.affinity().affinityTopologyVersion()));
+ startTopVer));
while (!isCancelled()) {
DiscoveryEvent evt = evts.take();
+ AffinityTopologyVersion evtTopVer = new AffinityTopologyVersion(evt.topologyVersion());
+
+ if (startTopVer.compareTo(evtTopVer) >= 0)
+ continue;
+
if (log.isDebugEnabled())
log.debug("Processing event: " + evt);
@@ -1436,7 +1446,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (!evts.isEmpty())
break;
- if (!cctx.affinity().primary(loc, it.next(), new AffinityTopologyVersion(evt.topologyVersion())))
+ if (!cctx.affinity().primary(loc, it.next(), evtTopVer))
it.remove();
}
@@ -1448,8 +1458,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (!evts.isEmpty())
break;
- if (part.primary(new AffinityTopologyVersion(evt.topologyVersion()))
- && primaryParts.add(part.id())) {
+ if (part.primary(evtTopVer) && primaryParts.add(part.id())) {
if (log.isDebugEnabled())
log.debug("Touching partition entries: " + part);
@@ -1461,6 +1470,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
catch (InterruptedException ignored) {
// No-op.
}
+ catch (IgniteCheckedException e) {
+ if (!e.hasCause(InterruptedException.class))
+ throw new IgniteException(e);
+ }
catch (IgniteException e) {
if (!e.hasCause(InterruptedException.class))
throw e;
@@ -1471,7 +1484,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
/**
* Wrapper around an entry to be put into queue.
*/
- private class EvictionInfo {
+ private static class EvictionInfo {
/** Cache entry. */
private GridCacheEntryEx entry;
@@ -1716,7 +1729,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
try {
// Touch primary entry (without backup nodes) if not evicted
// to keep tracking.
- if (!evict0(cctx.cache(), info.entry(), obsoleteVer, versionFilter(info), false) &&
+ if (!evict0(cctx.cache(), info.entry(), obsoleteVer, versionFilter(info), false, topVer) &&
plcEnabled)
touch0(info.entry());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
index cd10e11..52008f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.extensions.communication.*;
@@ -45,6 +46,9 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
/** Flag to indicate whether request processing has finished with error. */
private boolean err;
+ /** */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+
/**
* Required by {@link Externalizable}.
*/
@@ -112,6 +116,18 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
}
/**
+ * @param topVer Topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer == null ? AffinityTopologyVersion.NONE : topVer;
+ }
+
+ /**
* @return {@code True} if request processing has finished with error.
*/
boolean error() {
@@ -156,6 +172,12 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
writer.incrementState();
+ case 6:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -196,6 +218,14 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
reader.incrementState();
+ case 6:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return true;
@@ -208,7 +238,7 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4680994..da56a48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2296,7 +2296,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* @return {@code true} if entry has readers. It makes sense only for dht entry.
* @throws GridCacheEntryRemovedException If removed.
*/
- protected boolean hasReaders() throws GridCacheEntryRemovedException {
+ protected boolean hasReaders(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
return false;
}
@@ -2347,7 +2347,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
CacheObject val = saveValueForIndexUnlocked();
try {
- if ((!hasReaders() || readers)) {
+ if ((!hasReaders(cctx.affinity().affinityTopologyVersion()) || readers)) {
// markObsolete will clear the value.
if (!(marked = markObsolete0(ver, true))) {
if (log.isDebugEnabled())
@@ -2654,7 +2654,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
expiryPlc.ttlUpdated(key(),
version(),
- hasReaders() ? ((GridDhtCacheEntry)this).readers() : null);
+ hasReaders(cctx.affinity().affinityTopologyVersion()) ? ((GridDhtCacheEntry)this).readers() : null);
}
}
@@ -3597,7 +3597,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
/** {@inheritDoc} */
- @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
+ @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, AffinityTopologyVersion topVer,
@Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
boolean marked = false;
@@ -3606,7 +3606,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
synchronized (this) {
CacheObject prev = saveValueForIndexUnlocked();
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders(topVer) && markObsolete0(obsoleteVer, false)) {
if (swap) {
if (!isStartVersion()) {
try {
@@ -3651,7 +3651,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
CacheObject prevVal = saveValueForIndexUnlocked();
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders(topVer) && markObsolete0(obsoleteVer, false)) {
if (swap) {
if (!isStartVersion()) {
try {
@@ -3718,7 +3718,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
/** {@inheritDoc} */
- @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer)
+ @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer,
+ AffinityTopologyVersion topVer)
throws IgniteCheckedException {
assert Thread.holdsLock(this);
assert cctx.isSwapOrOffheapEnabled();
@@ -3726,7 +3727,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
GridCacheBatchSwapEntry ret = null;
try {
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders(topVer) && markObsolete0(obsoleteVer, false)) {
if (!isStartVersion() && hasValueUnlocked()) {
IgniteUuid valClsLdrId = null;
IgniteUuid keyClsLdrId = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 89b85c4..68dcd6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -624,7 +624,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected synchronized boolean hasReaders() throws GridCacheEntryRemovedException {
+ @Override protected synchronized boolean hasReaders(AffinityTopologyVersion topVer)
+ throws GridCacheEntryRemovedException {
+ if (!cctx.affinity().primary(partition(), topVer).id().equals(cctx.localNodeId()))
+ return false;
+
checkReadersLocked();
return rdrs.length > 0;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 4ac81f8..0ffa500 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -274,7 +274,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
try {
CacheObject val = cached.peek(true, false, false, null);
- if (val == null && cached.evictInternal(false, xidVer, null)) {
+ if (val == null && cached.evictInternal(false, xidVer, topologyVersion(), null)) {
evicted.add(entry.txKey());
return false;
@@ -332,7 +332,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
CacheObject peek = cached.peek(true, false, false, null);
- if (peek == null && cached.evictInternal(false, xidVer, null)) {
+ if (peek == null && cached.evictInternal(false, xidVer, topologyVersion(), null)) {
cached.context().cache().removeIfObsolete(key.key());
evicted.add(key);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index eaa6e13..8741662 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -395,7 +395,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** @inheritDoc */
- @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
+ @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, AffinityTopologyVersion topVer,
@Nullable CacheEntryPredicate[] filter) {
assert false;
@@ -403,8 +403,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** {@inheritDoc} */
- @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer)
- throws IgniteCheckedException {
+ @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer,
+ AffinityTopologyVersion topVer) throws IgniteCheckedException {
assert false;
return null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java
new file mode 100644
index 0000000..a06e6be
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.lru.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class IgniteCacheSynchronizedEvictionsSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final int NODES_CNT = 4;
+
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(NODES_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (getTestGridName(NODES_CNT - 1).equals(gridName))
+ cfg.setClientMode(true);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedPartitionedManual() throws Exception {
+ testEvictSynchronizedManual(CacheMode.PARTITIONED, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedReplicatedManual() throws Exception {
+ testEvictSynchronizedManual(CacheMode.REPLICATED, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedPartitionedPolicy() throws Exception {
+ testEvictSynchronizedManual(CacheMode.PARTITIONED, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedReplicatedPolicy() throws Exception {
+ testEvictSynchronizedManual(CacheMode.REPLICATED, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedPartitionedManualTopChange() throws Exception {
+ testEvictSynchronizedManual(CacheMode.PARTITIONED, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedReplicatedManualTopChange() throws Exception {
+ testEvictSynchronizedManual(CacheMode.REPLICATED, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedPartitionedPolicyTopChange() throws Exception {
+ testEvictSynchronizedManual(CacheMode.PARTITIONED, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEvictSynchronizedReplicatedPolicyTopChange() throws Exception {
+ testEvictSynchronizedManual(CacheMode.REPLICATED, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testEvictSynchronizedManual(CacheMode cacheMode, boolean manual, final boolean topChange) throws Exception {
+ for (int i = 0; i < NODES_CNT; i++)
+ info("Node [i=" + i + ", id=" + grid(i).localNode().id() + ']');
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setEvictionPolicy(new LruEvictionPolicy(10));
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setBackups(1);
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+ ccfg.setEvictSynchronized(true);
+ ccfg.setEvictSynchronizedKeyBufferSize(1);
+
+ IgniteCache<Object, Object> cache = grid(0).createCache(ccfg);
+
+ try {
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ final IgniteCache<Object, Object> nearCache = grid(3).createNearCache(null, new NearCacheConfiguration<>());
+
+ // Initialize readers.
+ for (int i = 0; i < 10; i++)
+ assertEquals(i, nearCache.get(i));
+
+ // Check readers updated.
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i + 1);
+
+ for (int i = 0; i < 10; i++)
+ assertEquals(i + 1, nearCache.localPeek(i));
+
+ if (topChange) {
+ IgniteEx started = startGrid(NODES_CNT);
+
+ info("Started grid: " + started.localNode().id());
+ }
+
+ // Force server eviction.
+ if (manual) {
+ for (int i = 0; i < 10; i++) {
+ ClusterNode primary = grid(0).affinity(null).mapKeyToNode(i);
+
+ grid(primary).cache(null).localEvict(F.asList(i));
+ }
+ }
+ else {
+ for (int i = 100; i < 200; i++)
+ cache.put(i, i);
+ }
+
+ GridTestUtils.waitForCondition(new PA() {
+ /** {@inheritDoc} */
+ @Override public boolean apply() {
+ for (int i = 0; i < 10; i++) {
+ if (nearCache.localPeek(i) != null) {
+ info("Entry is still present on near cache: " + i);
+
+ return false;
+ }
+ }
+
+ for (int g = 0; g < (topChange ? NODES_CNT + 1 : NODES_CNT); g++) {
+ if (g == NODES_CNT - 1)
+ continue;
+
+ IgniteCache<Object, Object> cache0 = grid(g).cache(null);
+
+ for (int i = 0; i < 10; i++) {
+ if (cache0.localPeek(i) != null) {
+ info("Entry is still present on DHT node [key=" + i + ", node=" + g + ']');
+
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+ }, getTestTimeout());
+ }
+ finally {
+ grid(0).destroyCache(null);
+
+ if (topChange)
+ stopGrid(NODES_CNT);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
index 170ff18..51c5101 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.affinity.rendezvous.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;
@@ -267,7 +268,9 @@ public class GridCacheDhtEntrySelfTest extends GridCommonAbstractTest {
assert e0.readers().contains(other.id());
assert e1 == null || e1.readers().isEmpty();
- assert !e0.evictInternal(false, dht0.context().versions().next(), null);
+ GridCacheContext<Integer, String> ctx0 = dht0.context();
+
+ assert !e0.evictInternal(false, ctx0.versions().next(), ctx0.affinity().affinityTopologyVersion(), null);
assertEquals(1, near0.localSize(CachePeekMode.ALL));
assertEquals(1, dht0.localSize(null));
@@ -275,7 +278,7 @@ public class GridCacheDhtEntrySelfTest extends GridCommonAbstractTest {
assertEquals(1, near1.localSize(CachePeekMode.ALL));
assertEquals(0, dht1.localSize(null));
- assert !e0.evictInternal(true, dht0.context().versions().next(), null);
+ assert !e0.evictInternal(true, ctx0.versions().next(), ctx0.affinity().affinityTopologyVersion(), null);
assertEquals(1, near0.localSize(CachePeekMode.ALL));
assertEquals(1, dht0.localSize(null));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 6a59826..97a7251 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -138,6 +138,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(IgniteCacheClientNodeChangingTopologyTest.class));
suite.addTest(new TestSuite(IgniteCacheClientNodeConcurrentStart.class));
+ suite.addTest(new TestSuite(IgniteCacheSynchronizedEvictionsSelfTest.class));
+
return suite;
}
}