You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/02 15:08:24 UTC

[23/26] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e6113bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e6113bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e6113bb

Branch: refs/heads/ignite-5075
Commit: 3e6113bb0c2e9c4fece2e58627c064896ba2d314
Parents: 98b4378
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 2 15:36:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 2 16:38:54 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 34 +++++----
 .../processors/cache/ClusterCachesInfo.java     | 56 +++++++++++----
 .../processors/cache/ExchangeActions.java       | 13 +---
 .../processors/cache/GridCacheProcessor.java    | 74 ++++++++++----------
 .../processors/cache/local/GridLocalCache.java  | 31 +-------
 .../cache/local/GridLocalLockFuture.java        | 41 ++++++++++-
 ...gniteCacheP2pUnmarshallingNearErrorTest.java |  6 +-
 7 files changed, 141 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3e6113bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 48bc6da..45f463b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -323,12 +323,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param exchActions Cache change requests to execte on exchange.
      */
     private void updateCachesInfo(ExchangeActions exchActions) {
-        for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
-            Integer cacheId = CU.cacheId(req.cacheName());
-
-            DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
+        for (ExchangeActions.ActionData action : exchActions.stopRequests()) {
+            DynamicCacheDescriptor desc = registeredCaches.remove(action.descriptor().cacheId());
 
-            assert desc != null : req.cacheName();
+            assert desc != null : action.request().cacheName();
         }
 
         for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
@@ -380,9 +378,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         });
 
         for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
-            DynamicCacheChangeRequest req = action.request();
+            DynamicCacheDescriptor cacheDesc = action.descriptor();
 
-            Integer cacheId = CU.cacheId(req.cacheName());
+            DynamicCacheChangeRequest req = action.request();
 
             boolean startCache;
 
@@ -399,15 +397,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
 
             if (startCache)
-                cctx.cache().prepareCacheStart(req, nearCfg, action.descriptor(), fut.topologyVersion());
+                cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
 
-            if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
+            if (fut.isCacheAdded(cacheDesc.cacheId(), fut.topologyVersion())) {
                 if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
                     U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
             }
 
             if (!crd || !lateAffAssign) {
-                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheDesc.cacheId());
 
                 if (cacheCtx != null && !cacheCtx.isLocal()) {
                     boolean clientCacheStarted =
@@ -430,7 +428,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
             }
             else
-                initStartedCacheOnCoordinator(fut, cacheId);
+                initStartedCacheOnCoordinator(fut, cacheDesc.cacheId());
         }
 
         for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) {
@@ -459,22 +457,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         Set<Integer> stoppedCaches = null;
 
-        for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
-            Integer cacheId = CU.cacheId(req.cacheName());
+        for (ExchangeActions.ActionData action : exchActions.stopRequests()) {
+            DynamicCacheDescriptor desc = action.descriptor();
 
-            cctx.cache().blockGateway(req);
+            cctx.cache().blockGateway(action.request());
 
-            if (crd) {
-                CacheHolder cache = caches.remove(cacheId);
+            if (crd && desc.cacheConfiguration().getCacheMode() != LOCAL) {
+                CacheHolder cache = caches.remove(desc.cacheId());
 
-                assert cache != null : req;
+                assert cache != null : action.request();
 
                 if (stoppedCaches == null)
                     stoppedCaches = new HashSet<>();
 
                 stoppedCaches.add(cache.cacheId());
 
-                cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
+                cctx.io().removeHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e6113bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index e75f93c..1064de3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -84,6 +84,9 @@ class ClusterCachesInfo {
         this.ctx = ctx;
     }
 
+    /**
+     * @param joinDiscoData Information about configured caches and templates.
+     */
     void onStart(CacheJoinNodeDiscoveryData joinDiscoData) {
         this.joinDiscoData = joinDiscoData;
     }
@@ -317,9 +320,12 @@ class ClusterCachesInfo {
         return incMinorTopVer;
     }
 
+    /**
+     * @param dataBag Discovery data bag.
+     */
     void collectJoiningNodeData(DiscoveryDataBag dataBag) {
         if (!ctx.isDaemon())
-        dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
+            dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
     }
 
     /**
@@ -411,26 +417,27 @@ class ClusterCachesInfo {
                     assert joinDiscoData != null;
                 }
 
-                processJoiningNode(joinDiscoData, node.id());
-
                 assert locJoinStartCaches == null;
 
                 locJoinStartCaches = new ArrayList<>();
 
-                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                    CacheConfiguration cfg = desc.cacheConfiguration();
+                if (!disconnectedState()) {
+                    processJoiningNode(joinDiscoData, node.id());
 
-                    CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
+                    for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                        CacheConfiguration cfg = desc.cacheConfiguration();
 
-                    boolean affNode = CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter());
+                        CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
 
-                    NearCacheConfiguration nearCfg = (!affNode && locCfg != null) ? locCfg.config().getNearConfiguration() : null;
+                        NearCacheConfiguration nearCfg = locCfg != null ? locCfg.config().getNearConfiguration() :
+                            null;
 
-                    if (locCfg != null || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
-                        locJoinStartCaches.add(new T2<>(desc, nearCfg));
-                }
+                        if (locCfg != null || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+                            locJoinStartCaches.add(new T2<>(desc, nearCfg));
+                    }
 
-                joinDiscoData = null;
+                    joinDiscoData = null;
+                }
             }
             else {
                 CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
@@ -459,6 +466,9 @@ class ClusterCachesInfo {
         }
     }
 
+    /**
+     * @param dataBag Discovery data bag.
+     */
     void collectGridNodeData(DiscoveryDataBag dataBag) {
         if (ctx.isDaemon())
             return;
@@ -515,7 +525,7 @@ class ClusterCachesInfo {
         if (ctx.isDaemon() || data.commonData() == null)
             return;
 
-        assert joinDiscoData != null;
+        assert joinDiscoData != null || disconnectedState();
         assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
 
         CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
@@ -576,6 +586,9 @@ class ClusterCachesInfo {
         gridData = cachesData;
     }
 
+    /**
+     * @param data Joining node data.
+     */
     void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
         if (data.hasJoiningNodeData()) {
             Serializable joiningNodeData = data.joiningNodeData();
@@ -610,6 +623,10 @@ class ClusterCachesInfo {
         }
     }
 
+    /**
+     * @param joinData Joined node discovery data.
+     * @param nodeId Joined node ID.
+     */
     private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
         for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
             CacheConfiguration cfg = cacheInfo.config();
@@ -688,7 +705,7 @@ class ClusterCachesInfo {
      * @return Stopped caches names.
      */
     Set<String> onReconnected() {
-        assert cachesOnDisconnect != null;
+        assert disconnectedState();
 
         Set<String> stoppedCaches = new HashSet<>();
 
@@ -716,6 +733,17 @@ class ClusterCachesInfo {
         return stoppedCaches;
     }
 
+    /**
+     * @return {@code True} if client node is currently in disconnected state.
+     */
+    private boolean disconnectedState() {
+        return cachesOnDisconnect != null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return {@code True} if cache with given name if system cache which should always survive client node disconnect.
+     */
     private boolean surviveReconnect(String cacheName) {
         return CU.isUtilityCache(cacheName) || CU.isAtomicsCache(cacheName);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e6113bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 9be4b6a..bcc77f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -104,17 +104,8 @@ public class ExchangeActions {
     /**
      * @return Stop cache requests.
      */
-    public List<DynamicCacheChangeRequest> stopRequests() {
-        List<DynamicCacheChangeRequest> res = null;
-
-        if (cachesToStop != null) {
-            res = new ArrayList<>(cachesToStop.size());
-
-            for (ActionData req : cachesToStop.values())
-                res.add(req.req);
-        }
-
-        return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+    Collection<ActionData> stopRequests() {
+        return cachesToStop != null ? cachesToStop.values() : Collections.EMPTY_LIST;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e6113bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b4e4b14..2ed20e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1338,6 +1338,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion locStartTopVer,
         CacheObjectContext cacheObjCtx,
+        boolean affNode,
         boolean updatesAllowed)
         throws IgniteCheckedException {
         assert cfg != null;
@@ -1398,8 +1399,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         storeMgr.initialize(cfgStore, sesHolders);
 
-        boolean affNode = cfg.getCacheMode() == LOCAL || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter());
-
         String memPlcName = cfg.getMemoryPolicyName();
 
         MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName);
@@ -1710,42 +1709,34 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Cache start request.
+     * @param cacheDesc Cache start request.
      * @param nearCfg Near cache configuration.
-     * @param desc Cache descriptor.
      * @param exchTopVer Current exchange version.
      * @throws IgniteCheckedException If failed.
      */
-    void prepareCacheStart(DynamicCacheChangeRequest req,
+    void prepareCacheStart(DynamicCacheDescriptor cacheDesc,
         @Nullable NearCacheConfiguration nearCfg,
-        DynamicCacheDescriptor desc,
         AffinityTopologyVersion exchTopVer)
         throws IgniteCheckedException {
-        assert req.start() : req;
-        assert req.cacheType() != null : req;
-
         prepareCacheStart(
-            req.startCacheConfiguration(),
+            cacheDesc.cacheConfiguration(),
             nearCfg,
-            req.cacheType(),
-            req.deploymentId(),
-            desc.startTopologyVersion(),
+            cacheDesc.cacheType(),
+            cacheDesc.deploymentId(),
+            cacheDesc.startTopologyVersion(),
             exchTopVer,
-            desc.schema()
+            cacheDesc.schema()
         );
     }
 
     /**
      * @param exchTopVer Current exchange version.
      * @throws IgniteCheckedException If failed.
-     * @return Collection of started caches.
      */
-    public List<DynamicCacheDescriptor> startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
+    public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
         List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin();
 
         if (!F.isEmpty(caches)) {
-            List<DynamicCacheDescriptor> started = new ArrayList<>(caches.size());
-
             for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) {
                 DynamicCacheDescriptor desc = t.get1();
 
@@ -1758,14 +1749,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     exchTopVer,
                     desc.schema()
                 );
-
-                started.add(desc);
             }
-
-            return started;
         }
-        else
-            return Collections.emptyList();
     }
 
     /**
@@ -1802,8 +1787,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param cfg Start configuration.
-     * @param nearCfg Near configuration.
+     * @param startCfg Start configuration.
+     * @param reqNearCfg Near configuration if specified for client cache start request.
      * @param cacheType Cache type.
      * @param deploymentId Deployment ID.
      * @param cacheStartTopVer Cache start topology version.
@@ -1812,29 +1797,42 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     private void prepareCacheStart(
-        CacheConfiguration cfg,
-        NearCacheConfiguration nearCfg,
+        CacheConfiguration startCfg,
+        @Nullable NearCacheConfiguration reqNearCfg,
         CacheType cacheType,
         IgniteUuid deploymentId,
         AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion exchTopVer,
         @Nullable QuerySchema schema
     ) throws IgniteCheckedException {
-        assert !caches.containsKey(cfg.getName()) : cfg.getName();
-
-        CacheConfiguration ccfg = new CacheConfiguration(cfg);
+        assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
 
-        if (nearCfg != null)
-            ccfg.setNearConfiguration(nearCfg);
+        CacheConfiguration ccfg = new CacheConfiguration(startCfg);
 
         CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
+        boolean affNode;
+
+        if (ccfg.getCacheMode() == LOCAL) {
+            affNode = true;
+
+            ccfg.setNearConfiguration(null);
+        }
+        else if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter()))
+            affNode = true;
+        else {
+            affNode = false;
+
+            ccfg.setNearConfiguration(reqNearCfg);
+        }
+
         GridCacheContext cacheCtx = createCache(ccfg,
             null,
             cacheType,
             cacheStartTopVer,
             exchTopVer,
             cacheObjCtx,
+            affNode,
             true);
 
         cacheCtx.dynamicDeploymentId(deploymentId);
@@ -1931,10 +1929,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         if (exchActions != null && err == null) {
-            for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
-                stopGateway(req);
+            for (ExchangeActions.ActionData action : exchActions.stopRequests()) {
+                stopGateway(action.request());
 
-                prepareCacheStop(req);
+                prepareCacheStop(action.request());
             }
 
             for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) {
@@ -1961,7 +1959,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param cacheName Cache name.
-     * @param deploymentId
+     * @param deploymentId Future deployment ID.
      */
     void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
         GridCacheProcessor.TemplateConfigurationFuture fut =
@@ -1975,7 +1973,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param req Request to complete future for.
      * @param err Error if any.
      */
-    public void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) {
+    void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) {
         if (req.initiatingNodeId().equals(ctx.localNodeId())) {
             DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e6113bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 94f618a..88499c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -148,35 +148,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
         GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
 
         try {
-            for (KeyCacheObject key : keys) {
-                while (true) {
-                    GridLocalCacheEntry entry = null;
-
-                    try {
-                        entry = entryExx(key);
-
-                        entry.unswap(false);
-
-                        if (!ctx.isAll(entry, filter)) {
-                            fut.onFailed();
-
-                            return fut;
-                        }
-
-                        // Removed exception may be thrown here.
-                        GridCacheMvccCandidate cand = fut.addEntry(entry);
-
-                        if (cand == null && fut.isDone())
-                            return fut;
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        if (log().isDebugEnabled())
-                            log().debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
-                    }
-                }
-            }
+            if (!fut.addEntries(keys))
+                return fut;
 
             if (!ctx.mvcc().addFuture(fut))
                 fut.onError(new IgniteCheckedException("Duplicate future ID (internal error): " + fut));

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e6113bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 59d0adb..9641533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -144,12 +144,51 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridLocalLockFuture.class);
+    }
+
+    /**
+     * @param keys Keys.
+     * @return {@code False} in case of error.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean addEntries(Collection<KeyCacheObject> keys) throws IgniteCheckedException {
+        for (KeyCacheObject key : keys) {
+            while (true) {
+                GridLocalCacheEntry entry = null;
+
+                try {
+                    entry = cache.entryExx(key);
+
+                    entry.unswap(false);
+
+                    if (!cctx.isAll(entry, filter)) {
+                        onFailed();
+
+                        return false;
+                    }
+
+                    // Removed exception may be thrown here.
+                    GridCacheMvccCandidate cand = addEntry(entry);
+
+                    if (cand == null && isDone())
+                        return false;
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+                }
+            }
+        }
 
         if (timeout > 0) {
             timeoutObj = new LockTimeoutObject();
 
             cctx.time().addTimeoutObject(timeoutObj);
         }
+
+        return true;
     }
 
     /** {@inheritDoc} */
@@ -216,7 +255,7 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
      * @return Lock candidate.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
+    private @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
         throws GridCacheEntryRemovedException {
         // Add local lock first, as it may throw GridCacheEntryRemovedException.
         GridCacheMvccCandidate c = entry.addLocal(

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e6113bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
index a4a831f..546ec06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
@@ -44,13 +44,13 @@ public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnma
 
     /** {@inheritDoc} */
     @Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
-        //GridCacheEvictionRequest unmarshalling failed test
-        readCnt.set(5); //2 for each put
+        //GridCacheEvictionRequest unmarshalling failed test.
+        readCnt.set(5); //2 for each put.
 
         jcache(0).put(new TestKey(String.valueOf(++key)), "");
         jcache(0).put(new TestKey(String.valueOf(++key)), "");
 
-        //Eviction request unmarshalling failed but ioManager does not hangs up.
+        // Eviction request unmarshalling failed but ioManager does not hangs up.
 
         // Wait for eviction complete.
         Thread.sleep(1000);