You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/17 14:30:21 UTC

ignite git commit: IGNITE-9710 Exchange worker liveness checking improvements

Repository: ignite
Updated Branches:
  refs/heads/master e0e02abaa -> 2bdc89827


IGNITE-9710 Exchange worker liveness checking improvements

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2bdc8982
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2bdc8982
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2bdc8982

Branch: refs/heads/master
Commit: 2bdc8982796626f41f39f7ae28af12966b04a24f
Parents: e0e02ab
Author: Andrey Kuznetsov <st...@gmail.com>
Authored: Wed Oct 17 17:25:54 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Wed Oct 17 17:25:54 2018 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  35 +-
 .../processors/cache/GridCacheMvccManager.java  |   2 +
 .../GridCachePartitionExchangeManager.java      |   7 +
 .../processors/cache/GridCacheProcessor.java    |   6 +
 .../GridDhtPartitionsExchangeFuture.java        | 366 +++++++++++++++----
 5 files changed, 341 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 9cbceb1..cedbde1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -41,7 +41,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -729,6 +728,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     return;
 
                 aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
+
+                cctx.exchange().exchangerUpdateHeartbeat();
             }
         });
     }
@@ -1154,6 +1155,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
                 else
                     aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
+
+                cctx.exchange().exchangerUpdateHeartbeat();
             }
         });
     }
@@ -1174,6 +1177,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     AffinityTopologyVersion topVer = fut.initialVersion();
 
                     aff.clientEventTopologyChange(fut.firstEvent(), topVer);
+
+                    cctx.exchange().exchangerUpdateHeartbeat();
                 }
             });
         }
@@ -1318,16 +1323,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                     CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
 
-                    if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE))
+                    if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         calculateAndInit(fut.events(), cache.affinity(), fut.initialVersion());
+
+                        cctx.exchange().exchangerUpdateHeartbeat();
+                    }
                 }
             });
         }
         else {
             forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                    if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
+                    if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         initAffinity(cachesRegistry.group(aff.groupId()), aff, fut);
+
+                        cctx.exchange().exchangerUpdateHeartbeat();
+                    }
                 }
             });
         }
@@ -1662,6 +1673,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
                         calculateAndInit(fut.events(), grpHolder.affinity(), topVer);
+
+                        cctx.exchange().exchangerUpdateHeartbeat();
                     }
                 });
             }
@@ -1791,6 +1804,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         calculateAndInit(fut.events(), grp.affinity(), topVer);
                 }
             }
+
+            cctx.exchange().exchangerUpdateHeartbeat();
         }
 
         for (int i = 0; i < fetchFuts.size(); i++) {
@@ -1804,6 +1819,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 fut.events().discoveryCache(),
                 cctx.cache().cacheGroup(grpId).affinity(),
                 fetchFut);
+
+            cctx.exchange().exchangerUpdateHeartbeat();
         }
     }
 
@@ -1874,6 +1891,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
 
                     cache.aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache());
+
+                    cctx.exchange().exchangerUpdateHeartbeat();
                 }
             });
         }
@@ -1881,6 +1900,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache());
+
+                    cctx.exchange().exchangerUpdateHeartbeat();
                 }
             });
         }
@@ -1977,6 +1998,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                                 aff.calculate(topVer, fut.events(), fut.events().discoveryCache());
 
                                 affFut.onDone(topVer);
+
+                                cctx.exchange().exchangerUpdateHeartbeat();
                             }
                         });
 
@@ -1999,6 +2022,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder);
 
                 assert old == null : old;
+
+                cctx.exchange().exchangerUpdateHeartbeat();
             }
         });
 
@@ -2075,6 +2100,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         null,
                         grp.rebalanceEnabled(),
                         affCache);
+
+                    cctx.exchange().exchangerUpdateHeartbeat();
                 }
             });
 
@@ -2111,6 +2138,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         for (GridDhtPartitionMap map0 : map.values())
                             cache.topology(fut.context().events().discoveryCache()).update(fut.exchangeId(), map0, true);
                     }
+
+                    cctx.exchange().exchangerUpdateHeartbeat();
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 690b15a..16324de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -367,6 +367,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     log.debug("Attempted to remove node locks from removed entry in mvcc manager " +
                         "disco callback (will ignore): " + entry);
             }
+
+            cctx.exchange().exchangerUpdateHeartbeat();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index dbfc3e4..0baf5a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2258,6 +2258,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Invokes {@link GridWorker#updateHeartbeat()} for exchange worker.
+     */
+    public void exchangerUpdateHeartbeat() {
+        exchWorker.updateHeartbeat();
+    }
+
+    /**
      * Invokes {@link GridWorker#blockingSectionBegin()} for exchange worker.
      * Should be called from exchange worker thread.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 68698ec..ec88a93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2026,6 +2026,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
                 t.get2(),
                 exchTopVer,
                 false);
+
+            context().exchange().exchangerUpdateHeartbeat();
         }
 
         if (log.isInfoEnabled())
@@ -2064,6 +2066,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
                     null,
                     exchTopVer,
                     false);
+
+                context().exchange().exchangerUpdateHeartbeat();
             }
         }
 
@@ -5030,6 +5034,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
             // Make sure to remove future before completion.
             pendingFuts.remove(id, this);
 
+            context().exchange().exchangerUpdateHeartbeat();
+
             return super.onDone(res, err);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 32cd0d4..e550a8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -86,13 +86,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
 import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -633,10 +633,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (newCrd) {
             IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false);
 
-            if (fut != null)
+            if (fut != null) {
                 fut.get();
 
+                cctx.exchange().exchangerUpdateHeartbeat();
+            }
+
             cctx.exchange().onCoordinatorInitialized();
+
+            cctx.exchange().exchangerUpdateHeartbeat();
         }
     }
 
@@ -654,7 +659,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         initTs = U.currentTimeMillis();
 
-        U.await(evtLatch);
+        cctx.exchange().exchangerBlockingSectionBegin();
+
+        try {
+            U.await(evtLatch);
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
 
         assert firstDiscoEvt != null : this;
         assert exchId.nodeId().equals(firstDiscoEvt.eventNode().id()) : this;
@@ -680,7 +692,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this);
 
-            cctx.kernalContext().coordinators().onExchangeStart(mvccCrd, exchCtx, crd);
+            cctx.exchange().exchangerBlockingSectionBegin();
+
+            try {
+                cctx.kernalContext().coordinators().onExchangeStart(mvccCrd, exchCtx, crd);
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
 
             assert state == null : state;
 
@@ -812,8 +831,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     assert false;
             }
 
-            if (cctx.localNode().isClient())
-                tryToPerformLocalSnapshotOperation();
+            if (cctx.localNode().isClient()) {
+                cctx.exchange().exchangerBlockingSectionBegin();
+
+                try {
+                    tryToPerformLocalSnapshotOperation();
+                }
+                finally {
+                    cctx.exchange().exchangerBlockingSectionEnd();
+                }
+            }
 
             if (exchLog.isInfoEnabled())
                 exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
@@ -845,15 +872,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     private IgniteInternalFuture<?> initCachesOnLocalJoin() throws IgniteCheckedException {
         if (isLocalNodeNotInBaseline()) {
-            cctx.cache().cleanupCachesDirectories();
+            cctx.exchange().exchangerBlockingSectionBegin();
 
-            cctx.database().cleanupCheckpointDirectory();
+            try {
+                cctx.cache().cleanupCachesDirectories();
+
+                cctx.database().cleanupCheckpointDirectory();
 
-            if (cctx.wal() != null)
-                cctx.wal().cleanupWalDirectories();
+                if (cctx.wal() != null)
+                    cctx.wal().cleanupWalDirectories();
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
         }
 
-        cctx.activate();
+        cctx.exchange().exchangerBlockingSectionBegin();
+
+        try {
+            cctx.activate();
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
 
         LocalJoinCachesContext locJoinCtx = exchActions == null ? null : exchActions.localJoinContext();
 
@@ -872,7 +913,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            cctx.database().readCheckpointAndRestoreMemory(startDescs);
+            cctx.exchange().exchangerBlockingSectionBegin();
+
+            try {
+                cctx.database().readCheckpointAndRestoreMemory(startDescs);
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
         }
 
         IgniteInternalFuture<?> cachesRegistrationFut = cctx.cache().startCachesOnLocalJoin(initialVersion(), locJoinCtx);
@@ -923,6 +971,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         continue;
 
                     grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false);
+
+                    cctx.exchange().exchangerUpdateHeartbeat();
                 }
             }
         }
@@ -953,29 +1003,50 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
 
                 if (updateTop && clientTop != null) {
-                    top.update(null,
-                        clientTop.partitionMap(true),
-                        clientTop.fullUpdateCounters(),
-                        Collections.emptySet(),
-                        null,
-                        null);
+                    cctx.exchange().exchangerBlockingSectionBegin();
+
+                    try {
+                        top.update(null,
+                            clientTop.partitionMap(true),
+                            clientTop.fullUpdateCounters(),
+                            Collections.emptySet(),
+                            null,
+                            null);
+                    }
+                    finally {
+                        cctx.exchange().exchangerBlockingSectionEnd();
+                    }
                 }
             }
 
-            top.updateTopologyVersion(
-                this,
-                events().discoveryCache(),
-                mvccCrd,
-                updSeq,
-                cacheGroupStopping(grp.groupId()));
+            cctx.exchange().exchangerBlockingSectionBegin();
+
+            try {
+                top.updateTopologyVersion(
+                    this,
+                    events().discoveryCache(),
+                    mvccCrd,
+                    updSeq,
+                    cacheGroupStopping(grp.groupId()));
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
         }
 
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-            top.updateTopologyVersion(this,
-                events().discoveryCache(),
-                mvccCrd,
-                -1,
-                cacheGroupStopping(top.groupId()));
+        cctx.exchange().exchangerBlockingSectionBegin();
+
+        try {
+            for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
+                top.updateTopologyVersion(this,
+                    events().discoveryCache(),
+                    mvccCrd,
+                    -1,
+                    cacheGroupStopping(top.groupId()));
+            }
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
         }
     }
 
@@ -1004,7 +1075,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 try {
-                    cctx.activate();
+                    cctx.exchange().exchangerBlockingSectionBegin();
+
+                    try {
+                        cctx.activate();
+                    }
+                    finally {
+                        cctx.exchange().exchangerBlockingSectionEnd();
+                    }
 
                     if (!cctx.kernalContext().clientNode()) {
                         List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
@@ -1017,12 +1095,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                                 startDescs.add(desc);
                         }
 
-                        cctx.database().readCheckpointAndRestoreMemory(startDescs);
+                        cctx.exchange().exchangerBlockingSectionBegin();
+
+                        try {
+                            cctx.database().readCheckpointAndRestoreMemory(startDescs);
+                        }
+                        finally {
+                            cctx.exchange().exchangerBlockingSectionEnd();
+                        }
                     }
 
                     assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
 
-                    registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
+                    cctx.exchange().exchangerBlockingSectionBegin();
+
+                    try {
+                        registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
+                    }
+                    finally {
+                        cctx.exchange().exchangerBlockingSectionEnd();
+                    }
 
                     if (log.isInfoEnabled()) {
                         log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
@@ -1038,8 +1130,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     exchangeLocE = e;
 
                     if (crd) {
-                        synchronized (mux) {
-                            exchangeGlobalExceptions.put(cctx.localNodeId(), e);
+                        cctx.exchange().exchangerBlockingSectionBegin();
+
+                        try {
+                            synchronized (mux) {
+                                exchangeGlobalExceptions.put(cctx.localNodeId(), e);
+                            }
+                        }
+                        finally {
+                            cctx.exchange().exchangerBlockingSectionEnd();
                         }
                     }
                 }
@@ -1051,6 +1150,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         ", topVer=" + initialVersion() + "]");
                 }
 
+                cctx.exchange().exchangerBlockingSectionBegin();
+
                 try {
                     cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext());
 
@@ -1076,9 +1177,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                     exchangeLocE = e;
                 }
+                finally {
+                    cctx.exchange().exchangerBlockingSectionEnd();
+                }
             }
         }
         else if (req.activate()) {
+            cctx.exchange().exchangerBlockingSectionBegin();
+
             // TODO: BLT changes on inactive cluster can't be handled easily because persistent storage hasn't been initialized yet.
             try {
                 if (!forceAffReassignment) {
@@ -1101,6 +1207,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 exchangeLocE = e;
             }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
         }
 
         return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
@@ -1116,6 +1225,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         assert !exchActions.clientOnlyExchange() : exchActions;
 
+        cctx.exchange().exchangerBlockingSectionBegin();
+
         try {
             assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
 
@@ -1133,6 +1244,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE);
         }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
 
         return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
     }
@@ -1213,10 +1327,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (crd != null) {
             assert !crd.isLocal() : crd;
 
-            if (!centralizedAff)
-                sendLocalPartitions(crd);
+            cctx.exchange().exchangerBlockingSectionBegin();
 
-            initDone();
+            try {
+                if (!centralizedAff)
+                    sendLocalPartitions(crd);
+
+                initDone();
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
 
             return;
         }
@@ -1226,13 +1347,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     GridAffinityAssignmentCache aff = grp.affinity();
 
                     aff.initialize(initialVersion(), aff.idealAssignment());
+
+                    cctx.exchange().exchangerUpdateHeartbeat();
                 }
             }
             else
                 onAllServersLeft();
         }
 
-        onDone(initialVersion());
+        cctx.exchange().exchangerBlockingSectionBegin();
+
+        try {
+            onDone(initialVersion());
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
     }
 
     /**
@@ -1247,13 +1377,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (grp.isLocal())
                 continue;
 
-            grp.preloader().onTopologyChanged(this);
+            cctx.exchange().exchangerBlockingSectionBegin();
+
+            try {
+                grp.preloader().onTopologyChanged(this);
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
         }
 
-        cctx.database().releaseHistoryForPreloading();
+        cctx.exchange().exchangerBlockingSectionBegin();
+
+        try {
+            cctx.database().releaseHistoryForPreloading();
 
-        // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange.
-        partHistReserved = cctx.database().reserveHistoryForExchange();
+            // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange.
+            partHistReserved = cctx.database().reserveHistoryForExchange();
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
 
         // Skipping wait on local join is available when all cluster nodes have the same protocol.
         boolean skipWaitOnLocalJoin = cctx.exchange().latch().canSkipJoiningNodes(initialVersion())
@@ -1288,7 +1432,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             if (topChanged) {
                 // Partition release future is done so we can flush the write-behind store.
-                cacheCtx.store().forceFlush();
+                cctx.exchange().exchangerBlockingSectionBegin();
+
+                try {
+                    cacheCtx.store().forceFlush();
+                }
+                finally {
+                    cctx.exchange().exchangerBlockingSectionEnd();
+                }
             }
         }
 
@@ -1296,7 +1447,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
            In case of persistent store is enabled we first restore partitions presented on disk.
            We need to guarantee that there are no partition state changes logged to WAL before this callback
            to make sure that we correctly restored last actual states. */
-        boolean restored = cctx.database().beforeExchange(this);
+        boolean restored;
+
+        cctx.exchange().exchangerBlockingSectionBegin();
+
+        try {
+            restored = cctx.database().beforeExchange(this);
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
 
         // Pre-create missing partitions using current affinity.
         if (!exchCtx.mergeExchanges()) {
@@ -1305,25 +1465,48 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     continue;
 
                 // It is possible affinity is not initialized yet if node joins to cluster.
-                if (grp.affinity().lastVersion().topologyVersion() > 0)
-                    grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false);
+                if (grp.affinity().lastVersion().topologyVersion() > 0) {
+                    cctx.exchange().exchangerBlockingSectionBegin();
+
+                    try {
+                        grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false);
+                    }
+                    finally {
+                        cctx.exchange().exchangerBlockingSectionEnd();
+                    }
+                }
             }
         }
 
         // After all partitions have been restored and pre-created it's safe to make first checkpoint.
-        if (restored)
-            cctx.database().onStateRestored();
+        if (restored) {
+            cctx.exchange().exchangerBlockingSectionBegin();
+
+            try {
+                cctx.database().onStateRestored();
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
+        }
 
         changeWalModeIfNeeded();
 
-        if (crd.isLocal()) {
-            if (remaining.isEmpty())
-                onAllReceived(null);
-        }
-        else
-            sendPartitions(crd);
+        cctx.exchange().exchangerBlockingSectionBegin();
 
-        initDone();
+        try {
+            if (crd.isLocal()) {
+                if (remaining.isEmpty())
+                    onAllReceived(null);
+            }
+            else
+                sendPartitions(crd);
+
+            initDone();
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
     }
 
     /**
@@ -1356,8 +1539,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private void changeWalModeIfNeeded() {
         WalStateAbstractMessage msg = firstWalMessage();
 
-        if (msg != null)
-            cctx.walState().onProposeExchange(msg.exchangeMessage());
+        if (msg != null) {
+            cctx.exchange().exchangerBlockingSectionBegin();
+
+            try {
+                cctx.walState().onProposeExchange(msg.exchangeMessage());
+            }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
+        }
     }
 
     /**
@@ -1397,17 +1588,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private void waitPartitionRelease(boolean distributed, boolean doRollback) throws IgniteCheckedException {
         Latch releaseLatch = null;
 
-        // Wait for other nodes only on first phase.
-        if (distributed)
-            releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion());
+        IgniteInternalFuture<?> partReleaseFut;
 
-        IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
+        cctx.exchange().exchangerBlockingSectionBegin();
 
-        // Assign to class variable so it will be included into toString() method.
-        this.partReleaseFut = partReleaseFut;
+        try {
+            // Wait for other nodes only on first phase.
+            if (distributed)
+                releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion());
+
+            partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
 
-        if (exchId.isLeft())
-            cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+            // Assign to class variable so it will be included into toString() method.
+            this.partReleaseFut = partReleaseFut;
+
+            if (exchId.isLeft())
+                cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+        }
+        finally {
+            cctx.exchange().exchangerBlockingSectionEnd();
+        }
 
         if (log.isTraceEnabled())
             log.trace("Before waiting for partition release future: " + this);
@@ -1428,6 +1628,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             // Read txTimeoutOnPME from configuration after every iteration.
             long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
 
+            cctx.exchange().exchangerBlockingSectionBegin();
+
             try {
                 // This avoids unnessesary waiting for rollback.
                 partReleaseFut.get(curTimeout > 0 && !txRolledBack ?
@@ -1454,6 +1656,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 throw e;
             }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
         }
 
         long waitEnd = U.currentTimeMillis();
@@ -1477,6 +1682,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         dumpCnt = 0;
 
         while (true) {
+            cctx.exchange().exchangerBlockingSectionBegin();
+
             try {
                 locksFut.get(waitTimeout, TimeUnit.MILLISECONDS);
 
@@ -1507,6 +1714,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         U.dumpThreads(log);
                 }
             }
+            finally {
+                cctx.exchange().exchangerBlockingSectionEnd();
+            }
         }
 
         if (releaseLatch == null) {
@@ -1560,6 +1770,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 continue;
 
             grp.preloader().unwindUndeploys();
+
+            cctx.exchange().exchangerUpdateHeartbeat();
         }
 
         cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
@@ -1972,12 +2184,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (super.onDone(res, err)) {
             afterLsnrCompleteFut.onDone();
 
-            if (log.isDebugEnabled())
+            if (log.isDebugEnabled()) {
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
-                    ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
-            else if(log.isInfoEnabled())
-                log.info("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange=" + shortInfo() +
-                     ", topVer=" + topologyVersion() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+                        ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+            }
+            else if (log.isInfoEnabled()) {
+                log.info("Completed partition exchange [localNode=" + cctx.localNodeId() +
+                        ", exchange=" + shortInfo() + ", topVer=" + topologyVersion() +
+                        ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+            }
 
             initFut.onDone(err == null);
 
@@ -2041,6 +2256,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     (int)(cctx.kernalContext().config().getFailureDetectionTimeout() / 2));
 
                 for (;;) {
+                    cctx.exchange().exchangerBlockingSectionBegin();
+
                     try {
                         registerCachesFut.get(timeout, TimeUnit.SECONDS);
 
@@ -2055,6 +2272,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             "Probably disk is too busy or slow." +
                             "[caches=" + cacheNames + "]");
                     }
+                    finally {
+                        cctx.exchange().exchangerBlockingSectionEnd();
+                    }
                 }
             }
         }
@@ -4010,6 +4230,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             grp.affinity().idealAssignment(affAssignment);
 
             grp.affinity().initialize(initialVersion(), affAssignment);
+
+            cctx.exchange().exchangerUpdateHeartbeat();
         }
     }