You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:49:31 UTC

ignite git commit: ignite-5872 Replace standard java maps for partition counters with more effective data structures

Repository: ignite
Updated Branches:
  refs/heads/master 3084f00ee -> 918c409f5


ignite-5872 Replace standard java maps for partition counters with more effective data structures


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/918c409f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/918c409f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/918c409f

Branch: refs/heads/master
Commit: 918c409f51b5be9058590e0e7f1fbfd4c7f48573
Parents: 3084f00
Author: Alexey Goncharuk <ag...@apache.org>
Authored: Mon Aug 21 13:49:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 21 13:49:22 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   2 +-
 .../GridCachePartitionExchangeManager.java      |  45 +++---
 .../cache/IgniteCacheOffheapManager.java        |   2 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |   7 +-
 .../dht/GridClientPartitionTopology.java        |  59 +++----
 .../dht/GridDhtPartitionTopology.java           |  25 ++-
 .../dht/GridDhtPartitionTopologyImpl.java       | 145 ++++++++++-------
 .../CachePartitionFullCountersMap.java          |  99 ++++++++++++
 .../CachePartitionPartialCountersMap.java       | 161 +++++++++++++++++++
 .../GridDhtPartitionsAbstractMessage.java       |   8 -
 .../GridDhtPartitionsExchangeFuture.java        |  36 +++--
 .../preloader/GridDhtPartitionsFullMessage.java |  13 +-
 .../GridDhtPartitionsSingleMessage.java         |  21 +--
 .../GridDhtPartitionsSingleRequest.java         |   8 -
 .../IgniteDhtPartitionCountersMap.java          |  14 +-
 .../persistence/GridCacheOffheapManager.java    |   2 +-
 .../continuous/GridContinuousProcessor.java     |   7 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |  11 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |   3 +-
 19 files changed, 475 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 5d77c9e..39a5ea8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -459,7 +459,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             if (clientTop != null) {
                                 grp.topology().update(grpHolder.affinity().lastVersion(),
                                     clientTop.partitionMap(true),
-                                    clientTop.updateCounters(false),
+                                    clientTop.fullUpdateCounters(),
                                     Collections.<Integer>emptySet(),
                                     null);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 08eb1bf..200f677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -723,23 +723,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (top != null)
             return top;
 
-        Object affKey = null;
-
         CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);
 
-        if (grpDesc != null) {
-            CacheConfiguration<?, ?> ccfg = grpDesc.config();
+        assert grpDesc != null : grpId;
 
-            AffinityFunction aff = ccfg.getAffinity();
+        CacheConfiguration<?, ?> ccfg = grpDesc.config();
 
-            affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
-                ccfg.getNodeFilter(),
-                ccfg.getBackups(),
-                aff.partitions());
-        }
+        AffinityFunction aff = ccfg.getAffinity();
+
+        Object affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
+            ccfg.getNodeFilter(),
+            ccfg.getBackups(),
+            aff.partitions());
 
         GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
-            top = new GridClientPartitionTopology(cctx, grpId, affKey));
+            top = new GridClientPartitionTopology(cctx, grpId, aff.partitions(), affKey));
 
         return old != null ? old : top;
     }
@@ -1004,8 +1002,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      *     finishUnmarshall methods are called).
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
-     * @param partHistSuppliers
-     * @param partsToReload
+     * @param partHistSuppliers Partition history suppliers map.
+     * @param partsToReload Partitions to reload map.
      * @return Message.
      */
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@@ -1049,7 +1047,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
 
                 if (exchId != null)
-                    m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
+                    m.addPartitionUpdateCounters(grp.groupId(), grp.topology().fullUpdateCounters());
             }
         }
 
@@ -1067,7 +1065,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
 
             if (exchId != null)
-                m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true));
+                m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters());
         }
 
         return m;
@@ -1120,7 +1118,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
         GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
             cctx.kernalContext().clientNode(),
-            false);
+            false,
+            null);
 
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
@@ -1144,10 +1143,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param sndCounters {@code True} if need send partition update counters.
      * @return Message.
      */
-    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId,
+    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(
+        @Nullable GridDhtPartitionExchangeId exchangeId,
         boolean clientOnlyExchange,
-        boolean sndCounters)
-    {
+        boolean sndCounters,
+        ExchangeActions exchActions
+    ) {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
             clientOnlyExchange,
             cctx.versions().last(),
@@ -1156,7 +1157,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         Map<Object, T2<Integer, GridPartitionStateMap>> dupData = new HashMap<>();
 
         for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (!grp.isLocal()) {
+            if (!grp.isLocal() && (exchActions == null || !exchActions.cacheGroupStopping(grp.groupId()))) {
                 GridDhtPartitionMap locMap = grp.topology().localPartitionMap();
 
                 addPartitionMap(m,
@@ -1167,7 +1168,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     grp.affinity().similarAffinityKey());
 
                 if (sndCounters)
-                    m.partitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
+                    m.partitionUpdateCounters(grp.groupId(), grp.topology().localUpdateCounters(true));
             }
         }
 
@@ -1185,7 +1186,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 top.similarAffinityKey());
 
             if (sndCounters)
-                m.partitionUpdateCounters(top.groupId(), top.updateCounters(true));
+                m.partitionUpdateCounters(top.groupId(), top.localUpdateCounters(true));
         }
 
         return m;

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 001848e..4531802 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -414,7 +414,7 @@ public interface IgniteCacheOffheapManager {
         /**
          * @return Initial update counter.
          */
-        public Long initialUpdateCounter();
+        public long initialUpdateCounter();
 
         /**
          * @param cctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index ba6c89d..9e48d45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1054,8 +1054,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** */
         private final ConcurrentMap<Integer, AtomicLong> cacheSizes = new ConcurrentHashMap<>();
 
-        /** Initialized update counter. */
-        protected Long initCntr = 0L;
+        /** Initial update counter. */
+        protected long initCntr;
 
         /**
          * @param partId Partition number.
@@ -1600,7 +1600,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public Long initialUpdateCounter() {
+        @Override public long initialUpdateCounter() {
             return initCntr;
         }
 
@@ -1629,7 +1629,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
          * @param key Key.
          * @param oldVal Old value.
          * @param newVal New value.
-         * @throws IgniteCheckedException If failed.
          */
         private void updateIgfsMetrics(
             GridCacheContext cctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 745e7d7..77792c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -48,7 +50,6 @@ import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -106,7 +107,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
     /** Partition update counters. */
-    private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
+    private CachePartitionFullCountersMap cntrMap;
 
     /** */
     private final Object similarAffKey;
@@ -117,11 +118,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /**
      * @param cctx Context.
      * @param grpId Group ID.
+     * @param parts Number of partitions in the group.
      * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
         GridCacheSharedContext<?, ?> cctx,
         int grpId,
+        int parts,
         Object similarAffKey
     ) {
         this.cctx = cctx;
@@ -135,6 +138,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         node2part = new GridDhtPartitionFullMap(cctx.localNode().id(),
             cctx.localNode().order(),
             updateSeq.get());
+
+        cntrMap = new CachePartitionFullCountersMap(parts);
     }
 
     /**
@@ -641,7 +646,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     @Override public boolean update(
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
-        Map<Integer, T2<Long, Long>> cntrMap,
+        @Nullable CachePartitionFullCountersMap cntrMap,
         Set<Integer> partsToReload,
         @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
@@ -744,7 +749,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             }
 
             if (cntrMap != null)
-                this.cntrMap = new HashMap<>(cntrMap);
+                this.cntrMap = new CachePartitionFullCountersMap(cntrMap);
 
             consistencyCheck();
 
@@ -759,17 +764,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+    @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
         assert cntrMap != null;
 
         lock.writeLock().lock();
 
         try {
-            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+            for (int i = 0; i < cntrMap.size(); i++) {
+                int pId = cntrMap.partitionAt(i);
+
+                long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
+                long updateCntr = cntrMap.updateCounterAt(i);
 
-                if (cntr == null || cntr.get2() < e.getValue().get2())
-                    this.cntrMap.put(e.getKey(), e.getValue());
+                if (this.cntrMap.updateCounter(pId) < updateCntr) {
+                    this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
+                    this.cntrMap.updateCounter(pId, updateCntr);
+                }
             }
         }
         finally {
@@ -777,6 +787,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void applyUpdateCounters() {
+        // No-op on client topology.
+    }
+
     /**
      * Method checks is new partition map more stale than current partition map
      * New partition map is stale if topology version or update sequence are less than of current map
@@ -1097,33 +1112,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
+    @Override public CachePartitionFullCountersMap fullUpdateCounters() {
         lock.readLock().lock();
 
         try {
-            if (skipZeros) {
-                Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrMap.size());
-
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                    T2<Long, Long> val = e.getValue();
-
-                    if (val.get1() == 0L && val.get2() == 0L)
-                        continue;
-
-                    res.put(e.getKey(), e.getValue());
-                }
-
-                return res;
-            }
-            else
-                return new HashMap<>(cntrMap);
-}
+            return new CachePartitionFullCountersMap(cntrMap);
+        }
         finally {
             lock.readLock().unlock();
         }
     }
 
     /** {@inheritDoc} */
+    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+        return CachePartitionPartialCountersMap.EMPTY;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
         assert false : "Should not be called on non-affinity node";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 0dea5e4..22205ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -29,12 +28,13 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -257,7 +257,7 @@ public interface GridDhtPartitionTopology {
      *      means full map received is not related to exchange
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
-     * @param partsToReload
+     * @param partsToReload Set of partitions that need to be reloaded.
      * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not
      *      related to exchange. Value should be not less than previous 'Topology version from exchange'.
      * @return {@code True} if local state was changed.
@@ -265,7 +265,7 @@ public interface GridDhtPartitionTopology {
     public boolean update(
         @Nullable AffinityTopologyVersion exchangeResVer,
         GridDhtPartitionFullMap partMap,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
+        @Nullable CachePartitionFullCountersMap cntrMap,
         Set<Integer> partsToReload,
         @Nullable AffinityTopologyVersion msgTopVer);
 
@@ -280,9 +280,16 @@ public interface GridDhtPartitionTopology {
         boolean force);
 
     /**
+     * Collects update counters collected during exchange. Called on coordinator.
+     *
      * @param cntrMap Counters map.
      */
-    public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);
+    public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap);
+
+    /**
+     * Applies update counters collected during exchange on coordinator. Called on coordinator.
+     */
+    public void applyUpdateCounters();
 
     /**
      * Checks if there is at least one owner for each partition in the cache topology.
@@ -309,10 +316,14 @@ public interface GridDhtPartitionTopology {
     public Collection<Integer> lostPartitions();
 
     /**
-     * @param skipZeros If {@code true} then filters out zero counters.
      * @return Partition update counters.
      */
-    public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros);
+    public CachePartitionFullCountersMap fullUpdateCounters();
+
+    /**
+     * @return Partition update counters.
+     */
+    public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
 
     /**
      * @param part Partition to own.

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index d1e4fa9..16fe012 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -43,6 +43,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -53,7 +55,6 @@ import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -131,7 +132,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
 
     /** Partition update counter. */
-    private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
+    private final CachePartitionFullCountersMap cntrMap;
 
     /** */
     private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
@@ -140,8 +141,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param ctx Cache shared context.
      * @param grp Cache group.
      */
-    public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx,
-        CacheGroupContext grp) {
+    public GridDhtPartitionTopologyImpl(
+        GridCacheSharedContext ctx,
+        CacheGroupContext grp
+    ) {
         assert ctx != null;
         assert grp != null;
 
@@ -153,6 +156,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG);
 
         locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
+
+        cntrMap = new CachePartitionFullCountersMap(locParts.length());
     }
 
     /** {@inheritDoc} */
@@ -713,10 +718,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
-            T2<Long, Long> cntr = cntrMap.get(p);
+            long updCntr = cntrMap.updateCounter(p);
 
-            if (cntr != null)
-                loc.updateCounter(cntr.get2());
+            if (updCntr != 0)
+                loc.updateCounter(updCntr);
 
             if (ctx.pageStore() != null) {
                 try {
@@ -1165,7 +1170,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     @Override public boolean update(
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
-        @Nullable Map<Integer, T2<Long, Long>> incomeCntrMap,
+        @Nullable CachePartitionFullCountersMap incomeCntrMap,
         Set<Integer> partsToReload,
         @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
@@ -1180,14 +1185,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 return false;
 
             if (incomeCntrMap != null) {
-                // update local map partition counters
-                for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) {
-                    T2<Long, Long> existCntr = this.cntrMap.get(e.getKey());
-
-                    if (existCntr == null || existCntr.get2() < e.getValue().get2())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-
                 // update local counters in partitions
                 for (int i = 0; i < locParts.length(); i++) {
                     GridDhtLocalPartition part = locParts.get(i);
@@ -1195,10 +1192,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (part == null)
                         continue;
 
-                    T2<Long, Long> cntr = incomeCntrMap.get(part.id());
+                    if (part.state() == OWNING || part.state() == MOVING) {
+                        long updCntr = incomeCntrMap.updateCounter(part.id());
 
-                    if (cntr != null)
-                        part.updateCounter(cntr.get2());
+                        if (updCntr != 0 && updCntr > part.updateCounter())
+                            part.updateCounter(updCntr);
+                    }
                 }
             }
 
@@ -1330,13 +1329,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                         assert locPart != null : grp.cacheOrGroupName();
 
-                        if (incomeCntrMap != null) {
-                            T2<Long, Long> cntr = incomeCntrMap.get(p);
-
-                            if (cntr != null && cntr.get2() > locPart.updateCounter())
-                                locPart.updateCounter(cntr.get2());
-                        }
-
                         if (locPart.state() == MOVING) {
                             boolean success = locPart.own();
 
@@ -1356,13 +1348,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                             changed = true;
                         }
-
-                        if (incomeCntrMap != null) {
-                            T2<Long, Long> cntr = incomeCntrMap.get(p);
-
-                            if (cntr != null && cntr.get2() > locPart.updateCounter())
-                                locPart.updateCounter(cntr.get2());
-                        }
                     }
                     else if (state == RENTING && partsToReload.contains(p)) {
                         GridDhtLocalPartition locPart = locParts.get(p);
@@ -1412,7 +1397,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+    @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
         assert cntrMap != null;
 
         long now = U.currentTimeMillis();
@@ -1431,12 +1416,40 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return;
 
-            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+            for (int i = 0; i < cntrMap.size(); i++) {
+                int pId = cntrMap.partitionAt(i);
+
+                long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
+                long updateCntr = cntrMap.updateCounterAt(i);
 
-                if (cntr == null || cntr.get2() < e.getValue().get2())
-                    this.cntrMap.put(e.getKey(), e.getValue());
+                if (this.cntrMap.updateCounter(pId) < updateCntr) {
+                    this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
+                    this.cntrMap.updateCounter(pId, updateCntr);
+                }
             }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void applyUpdateCounters() {
+        long now = U.currentTimeMillis();
+
+        lock.writeLock().lock();
+
+        try {
+            long acquired = U.currentTimeMillis();
+
+            if (acquired - now >= 100) {
+                if (timeLog.isInfoEnabled())
+                    timeLog.info("Waited too long to acquire topology write lock " +
+                        "[cache=" + grp.groupId() + ", waitTime=" + (acquired - now) + ']');
+            }
+
+            if (stopping)
+                return;
 
             for (int i = 0; i < locParts.length(); i++) {
                 GridDhtLocalPartition part = locParts.get(i);
@@ -1444,12 +1457,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                T2<Long, Long> cntr = cntrMap.get(part.id());
+                long updCntr = cntrMap.updateCounter(part.id());
 
-                if (cntr != null && cntr.get2() > part.updateCounter())
-                    part.updateCounter(cntr.get2());
-                else if (part.updateCounter() > 0)
-                    this.cntrMap.put(part.id(), new T2<>(part.initialUpdateCounter(), part.updateCounter()));
+                if (updCntr > part.updateCounter())
+                    part.updateCounter(updCntr);
+                else if (part.updateCounter() > 0) {
+                    cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter());
+                    cntrMap.updateCounter(part.id(), part.updateCounter());
+                }
             }
         }
         finally {
@@ -2178,26 +2193,32 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
+    @Override public CachePartitionFullCountersMap fullUpdateCounters() {
         lock.readLock().lock();
 
         try {
-            Map<Integer, T2<Long, Long>> res;
+            return new CachePartitionFullCountersMap(cntrMap);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
 
-            if (skipZeros) {
-                res = U.newHashMap(cntrMap.size());
+    /** {@inheritDoc} */
+    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+        lock.readLock().lock();
 
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                    Long cntr = e.getValue().get2();
+        try {
+            int locPartCnt = 0;
 
-                    if (ZERO.equals(cntr))
-                        continue;
+            for (int i = 0; i < locParts.length(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
 
-                    res.put(e.getKey(), e.getValue());
-                }
+                if (part != null)
+                    locPartCnt++;
             }
-            else
-                res = new HashMap<>(cntrMap);
+
+            CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(locPartCnt);
 
             for (int i = 0; i < locParts.length(); i++) {
                 GridDhtLocalPartition part = locParts.get(i);
@@ -2205,17 +2226,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                T2<Long, Long> cntr0 = res.get(part.id());
-                Long initCntr = part.initialUpdateCounter();
+                long updCntr = part.updateCounter();
+                long initCntr = part.initialUpdateCounter();
 
-                if (cntr0 == null || initCntr >= cntr0.get1()) {
-                    if (skipZeros && initCntr == 0L && part.updateCounter() == 0L)
-                        continue;
+                if (skipZeros && initCntr == 0L && updCntr == 0L)
+                    continue;
 
-                    res.put(part.id(), new T2<>(initCntr, part.updateCounter()));
-                }
+                res.add(part.id(), initCntr, updCntr);
             }
 
+            res.trim();
+
             return res;
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
new file mode 100644
index 0000000..1384a55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ *
+ */
+public class CachePartitionFullCountersMap implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long[] initialUpdCntrs;
+
+    /** */
+    private long[] updCntrs;
+
+    /**
+     * @param other Map to copy.
+     */
+    public CachePartitionFullCountersMap(CachePartitionFullCountersMap other) {
+        initialUpdCntrs = Arrays.copyOf(other.initialUpdCntrs, other.initialUpdCntrs.length);
+        updCntrs = Arrays.copyOf(other.updCntrs, other.updCntrs.length);
+    }
+
+    /**
+     * @param partsCnt Total number of partitions.
+     */
+    public CachePartitionFullCountersMap(int partsCnt) {
+        initialUpdCntrs = new long[partsCnt];
+        updCntrs = new long[partsCnt];
+    }
+
+    /**
+     * Gets an initial update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @return Initial update counter for the partition with the given ID.
+     */
+    public long initialUpdateCounter(int p) {
+        return initialUpdCntrs[p];
+    }
+
+    /**
+     * Gets an update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @return Update counter for the partition with the given ID.
+     */
+    public long updateCounter(int p) {
+        return updCntrs[p];
+    }
+
+    /**
+     * Sets an initial update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @param initialUpdCntr Initial update counter to set.
+     */
+    public void initialUpdateCounter(int p, long initialUpdCntr) {
+        initialUpdCntrs[p] = initialUpdCntr;
+    }
+
+    /**
+     * Sets an update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @param updCntr Update counter to set.
+     */
+    public void updateCounter(int p, long updCntr) {
+        updCntrs[p] = updCntr;
+    }
+
+    /**
+     * Clears full counters map.
+     */
+    public void clear() {
+        Arrays.fill(initialUpdCntrs, 0);
+        Arrays.fill(updCntrs, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
new file mode 100644
index 0000000..851ffed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class CachePartitionPartialCountersMap implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final CachePartitionPartialCountersMap EMPTY = new CachePartitionPartialCountersMap();
+
+    /** */
+    private int[] partIds;
+
+    /** */
+    private long[] initialUpdCntrs;
+
+    /** */
+    private long[] updCntrs;
+
+    /** */
+    private int curIdx;
+
+    /** */
+    private CachePartitionPartialCountersMap() {
+        // Empty map.
+    }
+
+    /**
+     * @param partsCnt Total number of partitions will be stored in the partial map.
+     */
+    public CachePartitionPartialCountersMap(int partsCnt) {
+        partIds = new int[partsCnt];
+        initialUpdCntrs = new long[partsCnt];
+        updCntrs = new long[partsCnt];
+    }
+
+    /**
+     * @return Total number of partitions added to the map.
+     */
+    public int size() {
+        return curIdx;
+    }
+
+    /**
+     * Adds partition counters for a partition with the given ID.
+     *
+     * @param partId Partition ID to add.
+     * @param initialUpdCntr Partition initial update counter.
+     * @param updCntr Partition update counter.
+     */
+    public void add(int partId, long initialUpdCntr, long updCntr) {
+        if (curIdx > 0) {
+            if (partIds[curIdx - 1] >= partId)
+                throw new IllegalArgumentException("Adding a partition in the wrong order " +
+                    "[prevPart=" + partIds[curIdx - 1] + ", partId=" + partId + ']');
+        }
+
+        if (curIdx == partIds.length)
+            throw new IllegalStateException("Adding more partitions than reserved: " + partIds.length);
+
+        partIds[curIdx] = partId;
+        initialUpdCntrs[curIdx] = initialUpdCntr;
+        updCntrs[curIdx] = updCntr;
+
+        curIdx++;
+    }
+
+    /**
+     * Cuts the array sizes according to curIdx. No more entries can be added to this map
+     * after this method is called.
+     */
+    public void trim() {
+        if (curIdx < partIds.length) {
+            partIds = Arrays.copyOf(partIds, curIdx);
+            initialUpdCntrs = Arrays.copyOf(initialUpdCntrs, curIdx);
+            updCntrs = Arrays.copyOf(updCntrs, curIdx);
+        }
+    }
+
+    /**
+     * @param partId Partition ID to search.
+     * @return Partition index in the array.
+     */
+    public int partitionIndex(int partId) {
+        return Arrays.binarySearch(partIds, 0, curIdx, partId);
+    }
+
+    /**
+     * Gets partition ID saved at the given index.
+     *
+     * @param idx Index to get value from.
+     * @return Partition ID.
+     */
+    public int partitionAt(int idx) {
+        return partIds[idx];
+    }
+
+    /**
+     * Gets initial update counter saved at the given index.
+     *
+     * @param idx Index to get value from.
+     * @return Initial update counter.
+     */
+    public long initialUpdateCounterAt(int idx) {
+        return initialUpdCntrs[idx];
+    }
+
+    /**
+     * Gets update counter saved at the given index.
+     *
+     * @param idx Index to get value from.
+     * @return Update counter.
+     */
+    public long updateCounterAt(int idx) {
+        return updCntrs[idx];
+    }
+
+
+    /**
+     * @param cntrsMap Partial local counters map.
+     * @return Partition ID to partition counters map.
+     */
+    public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
+        if (cntrsMap.size() == 0)
+            return Collections.emptyMap();
+
+        Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrsMap.size());
+
+        for (int idx = 0; idx < cntrsMap.size(); idx++)
+            res.put(cntrsMap.partitionAt(idx),
+                new T2<>(cntrsMap.initialUpdateCounterAt(idx), cntrsMap.updateCounterAt(idx)));
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 95c1a4f..84cc792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.Map;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -111,12 +109,6 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /**
-     * @param grpId Cache group ID.
-     * @return Parition update counters.
-     */
-    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
-
-    /**
      * @return Last used version among all nodes.
      */
     @Nullable public GridCacheVersion lastVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ca6ee5e..d64adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -764,7 +764,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (updateTop && clientTop != null) {
                     top.update(null,
                         clientTop.partitionMap(true),
-                        clientTop.updateCounters(false),
+                        clientTop.fullUpdateCounters(),
                         Collections.<Integer>emptySet(),
                         null);
                 }
@@ -1213,7 +1213,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         GridDhtPartitionsSingleMessage msg;
 
-        // Reset lost partition before send local partition to coordinator.
+        // Reset lost partitions before sending local partitions to coordinator.
         if (exchActions != null) {
             Set<String> caches = exchActions.cachesToResetLostPartitions();
 
@@ -1230,7 +1230,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         else {
             msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
                 false,
-                true);
+                true,
+                exchActions);
 
             Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
@@ -1958,10 +1959,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         Map<Integer, Long> minCntrs = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
-            assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
+            CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId());
+
+            assert nodeCntrs != null;
 
-            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
-                int p = e0.getKey();
+            for (int i = 0; i < nodeCntrs.size(); i++) {
+                int p = nodeCntrs.partitionAt(i);
 
                 UUID uuid = e.getKey();
 
@@ -1970,10 +1973,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
                     continue;
 
-                Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2();
-
-                if (cntr == null)
-                    cntr = 0L;
+                long cntr = state == GridDhtPartitionState.MOVING ?
+                    nodeCntrs.initialUpdateCounterAt(i) :
+                    nodeCntrs.updateCounterAt(i);
 
                 Long minCntr = minCntrs.get(p);
 
@@ -2233,10 +2235,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     GridDhtPartitionTopology top = grp != null ? grp.topology() :
                         cctx.exchange().clientTopology(grpId);
 
-                    Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId);
+                    CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId);
 
                     if (cntrs != null)
-                        top.applyUpdateCounters(cntrs);
+                        top.collectUpdateCounters(cntrs);
                 }
 
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
@@ -2249,6 +2251,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
+            for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
+                if (!grpCtx.isLocal())
+                    grpCtx.topology().applyUpdateCounters();
+            }
+
             if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 assert firstDiscoEvt instanceof DiscoveryCustomEvent;
 
@@ -2563,7 +2570,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(
                     msg.restoreExchangeId(),
                     cctx.kernalContext().clientNode(),
-                    true);
+                    true,
+                    exchActions);
 
                 if (localJoinExchange() && finishState0 == null)
                     res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
@@ -2737,7 +2745,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
             Integer grpId = entry.getKey();
 
-            Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
+            CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId);
 
             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a164e85..2bb19cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -276,7 +275,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) {
         if (partCntrs == null)
             partCntrs = new IgniteDhtPartitionCountersMap();
 
@@ -287,14 +286,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
-        if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
-
-            return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
-        }
-
-        return Collections.emptyMap();
+    public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) {
+        return partCntrs == null ? null : partCntrs.get(grpId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index bc7d314..44815ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.Collection;
-import java.util.Map;
-import java.util.HashMap;
+import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.io.Externalizable;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -62,7 +61,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Partitions update counters. */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<Integer, Map<Integer, T2<Long, Long>>> partCntrs;
+    private Map<Integer, CachePartitionPartialCountersMap> partCntrs;
 
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
@@ -190,7 +189,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public void partitionUpdateCounters(int grpId, CachePartitionPartialCountersMap cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
@@ -201,14 +200,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
-        if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
-
-            return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
-        }
+    public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId) {
+        CachePartitionPartialCountersMap res = partCntrs == null ? null : partCntrs.get(grpId);
 
-        return Collections.emptyMap();
+        return res == null ? CachePartitionPartialCountersMap.EMPTY : res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 6317fbc..0be0f37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -19,9 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -78,11 +75,6 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
-        return Collections.emptyMap();
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index dc2fbf8..e7954d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -19,10 +19,8 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.ignite.internal.util.typedef.T2;
 
 /**
  * Partition counters map.
@@ -32,7 +30,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private Map<Integer, Map<Integer, T2<Long, Long>>> map;
+    private Map<Integer, CachePartitionFullCountersMap> map;
 
     /**
      * @return {@code True} if map is empty.
@@ -45,7 +43,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
      * @param cacheId Cache ID.
      * @param cntrMap Counters map.
      */
-    public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) {
         if (map == null)
             map = new HashMap<>();
 
@@ -57,14 +55,14 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
      * @param cacheId Cache ID.
      * @return Counters map.
      */
-    public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) {
+    public synchronized CachePartitionFullCountersMap get(int cacheId) {
         if (map == null)
-            map = new HashMap<>();
+            return null;
 
-        Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId);
+        CachePartitionFullCountersMap cntrMap = map.get(cacheId);
 
         if (cntrMap == null)
-            return Collections.emptyMap();
+            return null;
 
         return cntrMap;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ed6eee2..83a9f55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1167,7 +1167,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public Long initialUpdateCounter() {
+        @Override public long initialUpdateCounter() {
             try {
                 CacheDataStore delegate0 = init0(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7062353..fa52be2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -93,6 +93,7 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap;
 import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
 import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_NOTIFICATION;
 
@@ -202,7 +203,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                 GridCacheContext cctx = interCache != null ? interCache.context() : null;
 
                                 if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
-                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false));
+                                    cntrsPerNode.put(ctx.localNodeId(),
+                                        toCountersMap(cctx.topology().localUpdateCounters(false)));
 
                                 routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
@@ -1070,7 +1072,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
-                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false));
+                    req.addUpdateCounters(ctx.localNodeId(),
+                        toCountersMap(cache.context().topology().localUpdateCounters(false)));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 43069cd..f91c689 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -519,11 +520,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME);
 
-            Map<Integer, T2<Long, Long>> act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology().updateCounters(false);
+            CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology()
+                .localUpdateCounters(false);
 
             for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
-                if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))
-                    assertEquals(e.getValue(), act.get(e.getKey()).get2());
+                if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) {
+                    int partIdx = act.partitionIndex(e.getKey());
+
+                    assertEquals(e.getValue(), (Long)act.updateCounterAt(partIdx));
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/918c409f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 2ec2c74..202486c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -20,8 +20,6 @@ package org.apache.ignite.testsuites;
 import java.util.Set;
 import junit.framework.TestSuite;
 import org.apache.ignite.GridSuppressedExceptionSelfTest;
-import org.apache.ignite.internal.processors.database.SwapPathConstructionSelfTest;
-import org.apache.ignite.util.AttributeNodeFilterSelfTest;
 import org.apache.ignite.internal.ClusterGroupHostsSelfTest;
 import org.apache.ignite.internal.ClusterGroupSelfTest;
 import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest;
@@ -57,6 +55,7 @@ import org.apache.ignite.internal.processors.database.BPlusTreeSelfTest;
 import org.apache.ignite.internal.processors.database.FreeListImplSelfTest;
 import org.apache.ignite.internal.processors.database.MemoryMetricsSelfTest;
 import org.apache.ignite.internal.processors.database.MetadataStorageSelfTest;
+import org.apache.ignite.internal.processors.database.SwapPathConstructionSelfTest;
 import org.apache.ignite.internal.processors.odbc.OdbcConfigurationValidationSelfTest;
 import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest;
 import org.apache.ignite.internal.processors.service.ClosureServiceClientsNodesTest;