You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/07/08 00:59:14 UTC

incubator-ignite git commit: IGNITE-1103 - Fixing synchronous evictions.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1103 0f1b31a5b -> 31989a94c


IGNITE-1103 - Fixing synchronous evictions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31989a94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31989a94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31989a94

Branch: refs/heads/ignite-1103
Commit: 31989a94c4548a9a9c8a68324de50df5d7171eb1
Parents: 0f1b31a
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Jul 7 15:59:09 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Jul 7 15:59:09 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |  13 +-
 .../cache/GridCacheEvictionManager.java         |  93 ++++----
 .../cache/GridCacheEvictionResponse.java        |  32 ++-
 .../processors/cache/GridCacheMapEntry.java     |  17 +-
 .../distributed/dht/GridDhtCacheEntry.java      |   6 +-
 .../distributed/near/GridNearTxRemote.java      |   4 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   6 +-
 ...gniteCacheSynchronizedEvictionsSelfTest.java | 214 +++++++++++++++++++
 .../dht/GridCacheDhtEntrySelfTest.java          |   7 +-
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +
 10 files changed, 334 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 3857b35..9da4dba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -221,8 +221,12 @@ public interface GridCacheEntryEx {
      * @return {@code True} if entry could be evicted.
      * @throws IgniteCheckedException In case of error.
      */
-    public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
-        @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException;
+    public boolean evictInternal(
+        boolean swap,
+        GridCacheVersion obsoleteVer,
+        AffinityTopologyVersion topVer,
+        @Nullable CacheEntryPredicate[] filter
+    ) throws IgniteCheckedException;
 
     /**
      * Evicts entry when batch evict is performed. When called, does not write entry data to swap, but instead
@@ -232,7 +236,10 @@ public interface GridCacheEntryEx {
      * @return Swap entry if this entry was marked obsolete, {@code null} if entry was not evicted.
      * @throws IgniteCheckedException If failed.
      */
-    public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer) throws IgniteCheckedException;
+    public GridCacheBatchSwapEntry evictInBatchInternal(
+        GridCacheVersion obsoleteVer,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException;
 
     /**
      * This method should be called each time entry is marked obsolete

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index d565af1..1493fb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -147,7 +147,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
         if (!cctx.isLocal()) {
             evictSync = cfg.isEvictSynchronized() && !cctx.isNear() && !cctx.isSwapOrOffheapEnabled();
 
-            nearSync = isNearEnabled(cctx) && !cctx.isNear() && cfg.isEvictSynchronized();
+            nearSync = !cctx.isNear() && cfg.isEvictSynchronized();
         }
         else {
             if (cfg.isEvictSynchronized())
@@ -228,22 +228,13 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
      * Outputs warnings if potential configuration problems are detected.
      */
     private void reportConfigurationProblems() {
-        CacheMode mode = cctx.config().getCacheMode();
-
-        if (plcEnabled && !cctx.isNear() && mode == PARTITIONED) {
-            if (!evictSync) {
+        if (plcEnabled && !cctx.isNear()) {
+            if (!evictSync || !nearSync) {
                 U.warn(log, "Evictions are not synchronized with other nodes in topology " +
                     "which provides 2x-3x better performance but may cause data inconsistency if cache store " +
                     "is not configured (consider changing 'evictSynchronized' configuration property).",
                     "Evictions are not synchronized for cache: " + cctx.namexx());
             }
-
-            if (!nearSync && isNearEnabled(cctx)) {
-                U.warn(log, "Evictions on primary node are not synchronized with near caches on other nodes " +
-                    "which provides 2x-3x better performance but may cause data inconsistency (consider changing " +
-                    "'nearEvictSynchronized' configuration property).",
-                    "Evictions are not synchronized with near caches on other nodes for cache: " + cctx.namexx());
-            }
         }
     }
 
@@ -351,8 +342,11 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                         log.debug("Topology version is different [locTopVer=" + topVer +
                             ", rmtTopVer=" + req.topologyVersion() + ']');
 
-                    sendEvictionResponse(nodeId,
-                        new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true));
+                    GridCacheEvictionResponse res = new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true);
+
+                    res.topologyVersion(topVer);
+
+                    sendEvictionResponse(nodeId, res);
 
                     return;
                 }
@@ -418,7 +412,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
                     assert !near;
 
-                    boolean evicted = evictLocally(key, ver, near, obsoleteVer);
+                    boolean evicted = evictLocally(key, ver, near, obsoleteVer, req.topologyVersion());
 
                     if (log.isDebugEnabled())
                         log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near +
@@ -446,7 +440,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
             assert near;
 
-            boolean evicted = evictLocally(key, ver, near, obsoleteVer);
+            boolean evicted = evictLocally(key, ver, near, obsoleteVer, req.topologyVersion());
 
             if (log.isDebugEnabled())
                 log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near +
@@ -607,8 +601,9 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
     private boolean evictLocally(KeyCacheObject key,
         final GridCacheVersion ver,
         boolean near,
-        GridCacheVersion obsoleteVer)
-    {
+        GridCacheVersion obsoleteVer,
+        AffinityTopologyVersion topVer
+    ) {
         assert key != null;
         assert ver != null;
         assert obsoleteVer != null;
@@ -631,7 +626,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
             // without any consistency risks. We don't use filter in this case.
             // If entry should be evicted from DHT cache, we do not compare versions
             // as well because versions may change outside the transaction.
-            return evict0(cache, entry, obsoleteVer, null, false);
+            return evict0(cache, entry, obsoleteVer, null, false, topVer);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to evict entry on remote node [key=" + key + ", localNode=" + cctx.nodeId() + ']', e);
@@ -654,7 +649,8 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
         GridCacheEntryEx entry,
         GridCacheVersion obsoleteVer,
         @Nullable CacheEntryPredicate[] filter,
-        boolean explicit
+        boolean explicit,
+        AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
         assert cache != null;
         assert entry != null;
@@ -666,7 +662,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
         boolean hasVal = recordable && entry.hasValue();
 
-        boolean evicted = entry.evictInternal(cctx.isSwapOrOffheapEnabled(), obsoleteVer, filter);
+        boolean evicted = entry.evictInternal(cctx.isSwapOrOffheapEnabled(), obsoleteVer, topVer, filter);
 
         if (evicted) {
             // Remove manually evicted entry from policy.
@@ -721,7 +717,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
         if (memoryMode == OFFHEAP_TIERED) {
             try {
-                evict0(cctx.cache(), e, cctx.versions().next(), null, false);
+                evict0(cctx.cache(), e, cctx.versions().next(), null, false, cctx.affinity().affinityTopologyVersion());
             }
             catch (IgniteCheckedException ex) {
                 U.error(log, "Failed to evict entry from on heap memory: " + e, ex);
@@ -753,7 +749,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
         if (!cctx.isNear() && memoryMode == OFFHEAP_TIERED) {
             try {
-                evict0(cctx.cache(), e, cctx.versions().next(), null, false);
+                evict0(cctx.cache(), e, cctx.versions().next(), null, false, topVer);
             }
             catch (IgniteCheckedException ex) {
                 U.error(log, "Failed to evict entry from on heap memory: " + e, ex);
@@ -865,15 +861,16 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
         if (!cctx.isNear() && !explicit && !firstEvictWarn)
             warnFirstEvict();
 
+        AffinityTopologyVersion topVer = cctx.topology().topologyVersion();
+
         if (evictSyncAgr) {
             assert !cctx.isNear(); // Make sure cache is not NEAR.
 
-            if (cctx.affinity().backups(
-                    entry.key(),
-                    cctx.topology().topologyVersion()).contains(cctx.localNode()) &&
-                evictSync)
-                // Do not track backups if evicts are synchronized.
-                return !explicit;
+            // Do not track backups if evicts are synchronized.
+            if (evictSync) {
+                if (!cctx.affinity().primary(cctx.localNode(), entry.partition(), topVer))
+                    return !explicit;
+            }
 
             try {
                 if (!cctx.isAll(entry, filter))
@@ -898,7 +895,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
             // Do not touch entry if not evicted:
             // 1. If it is call from policy, policy tracks it on its own.
             // 2. If it is explicit call, entry is touched on tx commit.
-            return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit);
+            return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit, topVer);
         }
 
         return true;
@@ -915,8 +912,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
         List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
 
-        Set<GridCacheEntryEx> notRemove = null;
-
         Collection<GridCacheBatchSwapEntry> swapped = new ArrayList<>(keys.size());
 
         boolean recordable = cctx.events().isRecordable(EVT_CACHE_ENTRY_EVICTED);
@@ -935,6 +930,8 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                 cached.put(k, e);
         }
 
+        Set<GridCacheEntryEx> notRemove = null;
+
         try {
             for (GridCacheEntryEx entry : cached.values()) {
                 // Do not evict internal entries.
@@ -956,7 +953,8 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                 if (obsoleteVer == null)
                     obsoleteVer = cctx.versions().next();
 
-                GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer);
+                GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer,
+                    cctx.affinity().affinityTopologyVersion());
 
                 if (swapEntry != null) {
                     swapped.add(swapEntry);
@@ -1194,7 +1192,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
                     // Do not touch primary entries, if not evicted.
                     // They will be touched within updating transactions.
-                    evict0(cctx.cache(), entry, obsoleteVer, versionFilter(info), false);
+                    evict0(cctx.cache(), entry, obsoleteVer, versionFilter(info), false, topVer);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to evict entry [entry=" + entry +
@@ -1421,13 +1419,25 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
                 ClusterNode loc = cctx.localNode();
 
+                cctx.awaitStarted();
+
+                AffinityTopologyVersion startTopVer = cctx.startTopologyVersion();
+
+                if (startTopVer == null)
+                    startTopVer = cctx.affinity().affinityTopologyVersion();
+
                 // Initialize.
                 primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(),
-                    cctx.affinity().affinityTopologyVersion()));
+                    startTopVer));
 
                 while (!isCancelled()) {
                     DiscoveryEvent evt = evts.take();
 
+                    AffinityTopologyVersion evtTopVer = new AffinityTopologyVersion(evt.topologyVersion());
+
+                    if (startTopVer.compareTo(evtTopVer) >= 0)
+                        continue;
+
                     if (log.isDebugEnabled())
                         log.debug("Processing event: " + evt);
 
@@ -1436,7 +1446,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                         if (!evts.isEmpty())
                             break;
 
-                        if (!cctx.affinity().primary(loc, it.next(), new AffinityTopologyVersion(evt.topologyVersion())))
+                        if (!cctx.affinity().primary(loc, it.next(), evtTopVer))
                             it.remove();
                     }
 
@@ -1448,8 +1458,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                         if (!evts.isEmpty())
                             break;
 
-                        if (part.primary(new AffinityTopologyVersion(evt.topologyVersion()))
-                            && primaryParts.add(part.id())) {
+                        if (part.primary(evtTopVer) && primaryParts.add(part.id())) {
                             if (log.isDebugEnabled())
                                 log.debug("Touching partition entries: " + part);
 
@@ -1461,6 +1470,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
             catch (InterruptedException ignored) {
                 // No-op.
             }
+            catch (IgniteCheckedException e) {
+                if (!e.hasCause(InterruptedException.class))
+                    throw new IgniteException(e);
+            }
             catch (IgniteException e) {
                 if (!e.hasCause(InterruptedException.class))
                     throw e;
@@ -1471,7 +1484,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
     /**
      * Wrapper around an entry to be put into queue.
      */
-    private class EvictionInfo {
+    private static class EvictionInfo {
         /** Cache entry. */
         private GridCacheEntryEx entry;
 
@@ -1716,7 +1729,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                         try {
                             // Touch primary entry (without backup nodes) if not evicted
                             // to keep tracking.
-                            if (!evict0(cctx.cache(), info.entry(), obsoleteVer, versionFilter(info), false) &&
+                            if (!evict0(cctx.cache(), info.entry(), obsoleteVer, versionFilter(info), false, topVer) &&
                                 plcEnabled)
                                 touch0(info.entry());
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
index cd10e11..52008f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -45,6 +46,9 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
     /** Flag to indicate whether request processing has finished with error. */
     private boolean err;
 
+    /** */
+    private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -112,6 +116,18 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
     }
 
     /**
+     * @param topVer Topology version.
+     */
+    public void topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer == null ? AffinityTopologyVersion.NONE : topVer;
+    }
+
+    /**
      * @return {@code True} if request processing has finished with error.
      */
     boolean error() {
@@ -156,6 +172,12 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
 
                 writer.incrementState();
 
+            case 6:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -196,6 +218,14 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
+            case 6:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return true;
@@ -208,7 +238,7 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4680994..da56a48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2296,7 +2296,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
      * @return {@code true} if entry has readers. It makes sense only for dht entry.
      * @throws GridCacheEntryRemovedException If removed.
      */
-    protected boolean hasReaders() throws GridCacheEntryRemovedException {
+    protected boolean hasReaders(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
         return false;
     }
 
@@ -2347,7 +2347,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 CacheObject val = saveValueForIndexUnlocked();
 
                 try {
-                    if ((!hasReaders() || readers)) {
+                    if ((!hasReaders(cctx.affinity().affinityTopologyVersion()) || readers)) {
                         // markObsolete will clear the value.
                         if (!(marked = markObsolete0(ver, true))) {
                             if (log.isDebugEnabled())
@@ -2654,7 +2654,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             expiryPlc.ttlUpdated(key(),
                 version(),
-                hasReaders() ? ((GridDhtCacheEntry)this).readers() : null);
+                hasReaders(cctx.affinity().affinityTopologyVersion()) ? ((GridDhtCacheEntry)this).readers() : null);
         }
     }
 
@@ -3597,7 +3597,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
+    @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, AffinityTopologyVersion topVer,
         @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
         boolean marked = false;
 
@@ -3606,7 +3606,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 synchronized (this) {
                     CacheObject prev = saveValueForIndexUnlocked();
 
-                    if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+                    if (!hasReaders(topVer) && markObsolete0(obsoleteVer, false)) {
                         if (swap) {
                             if (!isStartVersion()) {
                                 try {
@@ -3651,7 +3651,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                         CacheObject prevVal = saveValueForIndexUnlocked();
 
-                        if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+                        if (!hasReaders(topVer) && markObsolete0(obsoleteVer, false)) {
                             if (swap) {
                                 if (!isStartVersion()) {
                                     try {
@@ -3718,7 +3718,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer)
+    @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer,
+        AffinityTopologyVersion topVer)
         throws IgniteCheckedException {
         assert Thread.holdsLock(this);
         assert cctx.isSwapOrOffheapEnabled();
@@ -3726,7 +3727,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         GridCacheBatchSwapEntry ret = null;
 
         try {
-            if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+            if (!hasReaders(topVer) && markObsolete0(obsoleteVer, false)) {
                 if (!isStartVersion() && hasValueUnlocked()) {
                     IgniteUuid valClsLdrId = null;
                     IgniteUuid keyClsLdrId = null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 89b85c4..68dcd6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -624,7 +624,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected synchronized boolean hasReaders() throws GridCacheEntryRemovedException {
+    @Override protected synchronized boolean hasReaders(AffinityTopologyVersion topVer)
+        throws GridCacheEntryRemovedException {
+        if (!cctx.affinity().primary(partition(), topVer).id().equals(cctx.localNodeId()))
+            return false;
+
         checkReadersLocked();
 
         return rdrs.length > 0;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 4ac81f8..0ffa500 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -274,7 +274,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
             try {
                 CacheObject val = cached.peek(true, false, false, null);
 
-                if (val == null && cached.evictInternal(false, xidVer, null)) {
+                if (val == null && cached.evictInternal(false, xidVer, topologyVersion(), null)) {
                     evicted.add(entry.txKey());
 
                     return false;
@@ -332,7 +332,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
 
                 CacheObject peek = cached.peek(true, false, false, null);
 
-                if (peek == null && cached.evictInternal(false, xidVer, null)) {
+                if (peek == null && cached.evictInternal(false, xidVer, topologyVersion(), null)) {
                     cached.context().cache().removeIfObsolete(key.key());
 
                     evicted.add(key);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index eaa6e13..8741662 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -395,7 +395,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** @inheritDoc */
-    @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
+    @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, AffinityTopologyVersion topVer,
         @Nullable CacheEntryPredicate[] filter) {
         assert false;
 
@@ -403,8 +403,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer)
-        throws IgniteCheckedException {
+    @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer,
+        AffinityTopologyVersion topVer) throws IgniteCheckedException {
         assert false;
 
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java
new file mode 100644
index 0000000..a06e6be
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSynchronizedEvictionsSelfTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.lru.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class IgniteCacheSynchronizedEvictionsSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NODES_CNT = 4;
+
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (getTestGridName(NODES_CNT - 1).equals(gridName))
+            cfg.setClientMode(true);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedPartitionedManual() throws Exception {
+        testEvictSynchronizedManual(CacheMode.PARTITIONED, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedReplicatedManual() throws Exception {
+        testEvictSynchronizedManual(CacheMode.REPLICATED, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedPartitionedPolicy() throws Exception {
+        testEvictSynchronizedManual(CacheMode.PARTITIONED, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedReplicatedPolicy() throws Exception {
+        testEvictSynchronizedManual(CacheMode.REPLICATED, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedPartitionedManualTopChange() throws Exception {
+        testEvictSynchronizedManual(CacheMode.PARTITIONED, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedReplicatedManualTopChange() throws Exception {
+        testEvictSynchronizedManual(CacheMode.REPLICATED, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedPartitionedPolicyTopChange() throws Exception {
+        testEvictSynchronizedManual(CacheMode.PARTITIONED, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvictSynchronizedReplicatedPolicyTopChange() throws Exception {
+        testEvictSynchronizedManual(CacheMode.REPLICATED, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testEvictSynchronizedManual(CacheMode cacheMode, boolean manual, final boolean topChange) throws Exception {
+        for (int i = 0; i < NODES_CNT; i++)
+            info("Node [i=" + i + ", id=" + grid(i).localNode().id() + ']');
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setEvictionPolicy(new LruEvictionPolicy(10));
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        ccfg.setEvictSynchronized(true);
+        ccfg.setEvictSynchronizedKeyBufferSize(1);
+
+        IgniteCache<Object, Object> cache = grid(0).createCache(ccfg);
+
+        try {
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            final IgniteCache<Object, Object> nearCache = grid(3).createNearCache(null, new NearCacheConfiguration<>());
+
+            // Initialize readers.
+            for (int i = 0; i < 10; i++)
+                assertEquals(i, nearCache.get(i));
+
+            // Check readers updated.
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i + 1);
+
+            for (int i = 0; i < 10; i++)
+                assertEquals(i + 1, nearCache.localPeek(i));
+
+            if (topChange) {
+                IgniteEx started = startGrid(NODES_CNT);
+
+                info("Started grid: " + started.localNode().id());
+            }
+
+            // Force server eviction.
+            if (manual) {
+                for (int i = 0; i < 10; i++) {
+                    ClusterNode primary = grid(0).affinity(null).mapKeyToNode(i);
+
+                    grid(primary).cache(null).localEvict(F.asList(i));
+                }
+            }
+            else {
+                for (int i = 100; i < 200; i++)
+                    cache.put(i, i);
+            }
+
+            GridTestUtils.waitForCondition(new PA() {
+                /** {@inheritDoc} */
+                @Override public boolean apply() {
+                    for (int i = 0; i < 10; i++) {
+                        if (nearCache.localPeek(i) != null) {
+                            info("Entry is still present on near cache: " + i);
+
+                            return false;
+                        }
+                    }
+
+                    for (int g = 0; g < (topChange ? NODES_CNT + 1 : NODES_CNT); g++) {
+                        if (g == NODES_CNT - 1)
+                            continue;
+
+                        IgniteCache<Object, Object> cache0 = grid(g).cache(null);
+
+                        for (int i = 0; i < 10; i++) {
+                            if (cache0.localPeek(i) != null) {
+                                info("Entry is still present on DHT node [key=" + i + ", node=" + g + ']');
+
+                                return false;
+                            }
+                        }
+                    }
+
+                    return true;
+                }
+            }, getTestTimeout());
+        }
+        finally {
+            grid(0).destroyCache(null);
+
+            if (topChange)
+                stopGrid(NODES_CNT);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
index 170ff18..51c5101 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.affinity.rendezvous.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
@@ -267,7 +268,9 @@ public class GridCacheDhtEntrySelfTest extends GridCommonAbstractTest {
         assert e0.readers().contains(other.id());
         assert e1 == null || e1.readers().isEmpty();
 
-        assert !e0.evictInternal(false, dht0.context().versions().next(), null);
+        GridCacheContext<Integer, String> ctx0 = dht0.context();
+
+        assert !e0.evictInternal(false, ctx0.versions().next(), ctx0.affinity().affinityTopologyVersion(), null);
 
         assertEquals(1, near0.localSize(CachePeekMode.ALL));
         assertEquals(1, dht0.localSize(null));
@@ -275,7 +278,7 @@ public class GridCacheDhtEntrySelfTest extends GridCommonAbstractTest {
         assertEquals(1, near1.localSize(CachePeekMode.ALL));
         assertEquals(0, dht1.localSize(null));
 
-        assert !e0.evictInternal(true, dht0.context().versions().next(), null);
+        assert !e0.evictInternal(true, ctx0.versions().next(), ctx0.affinity().affinityTopologyVersion(), null);
 
         assertEquals(1, near0.localSize(CachePeekMode.ALL));
         assertEquals(1, dht0.localSize(null));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31989a94/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 6a59826..97a7251 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -138,6 +138,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(IgniteCacheClientNodeChangingTopologyTest.class));
         suite.addTest(new TestSuite(IgniteCacheClientNodeConcurrentStart.class));
 
+        suite.addTest(new TestSuite(IgniteCacheSynchronizedEvictionsSelfTest.class));
+
         return suite;
     }
 }