You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/11 10:15:54 UTC

[1/2] ignite git commit: IGNITE-4941: Removed old GridDhtPartitionSupplyMessage.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3477-master 8122099f0 -> 3eb05de5e


IGNITE-4941: Removed old GridDhtPartitionSupplyMessage.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aeacad6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aeacad6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aeacad6b

Branch: refs/heads/ignite-3477-master
Commit: aeacad6b87ac95dd2f5da525573d6fa58f4e51db
Parents: edfa353
Author: devozerov <vo...@gridgain.com>
Authored: Tue Apr 11 12:18:52 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Apr 11 12:18:52 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../GridCachePartitionExchangeManager.java      |   7 +-
 .../processors/cache/GridCachePreloader.java    |   4 +-
 .../cache/GridCachePreloaderAdapter.java        |   4 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java | 310 +--------------
 .../GridDhtPartitionSupplyMessage.java          |  99 ++---
 .../GridDhtPartitionSupplyMessageV2.java        | 384 -------------------
 .../dht/preloader/GridDhtPreloader.java         |   3 +-
 .../resources/META-INF/classnames.properties    |   1 -
 .../CacheLateAffinityAssignmentTest.java        |   6 +-
 .../IgniteCacheReadFromBackupTest.java          |   6 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   |   3 +-
 13 files changed, 60 insertions(+), 777 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 737d047..8aac56d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
@@ -498,11 +497,6 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            case 45:
-                msg = new GridDhtPartitionSupplyMessage();
-
-                break;
-
             case 46:
                 msg = new GridDhtPartitionsFullMessage();
 
@@ -824,7 +818,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 114:
-                msg = new GridDhtPartitionSupplyMessageV2();
+                msg = new GridDhtPartitionSupplyMessage();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 231dff8..885106d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -67,8 +67,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -394,9 +393,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId);
 
                             if (cacheCtx != null) {
-                                if (m instanceof GridDhtPartitionSupplyMessageV2)
+                                if (m instanceof GridDhtPartitionSupplyMessage)
                                     cacheCtx.preloader().handleSupplyMessage(
-                                        idx, id, (GridDhtPartitionSupplyMessageV2)m);
+                                        idx, id, (GridDhtPartitionSupplyMessage)m);
                                 else if (m instanceof GridDhtPartitionDemandMessage)
                                     cacheCtx.preloader().handleDemandMessage(
                                         idx, id, (GridDhtPartitionDemandMessage)m);

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 0c28691..df0d71d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -168,7 +168,7 @@ public interface GridCachePreloader {
      * @param id Node Id.
      * @param s Supply message.
      */
-    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessage s);
 
     /**
      * Handles Demand message.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index d7ec288..ac3b6cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -134,7 +134,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+    @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessage s) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index d5f2246..f8114cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -541,7 +541,7 @@ public class GridDhtPartitionDemander {
     public void handleSupplyMessage(
         int idx,
         final UUID id,
-        final GridDhtPartitionSupplyMessageV2 supply
+        final GridDhtPartitionSupplyMessage supply
     ) {
         AffinityTopologyVersion topVer = supply.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 9942423..7c2599a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -87,13 +86,6 @@ class GridDhtPartitionSupplier {
     /**
      *
      */
-    void start() {
-        startOldListeners();
-    }
-
-    /**
-     *
-     */
     void stop() {
         synchronized (scMap) {
             Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
@@ -106,8 +98,6 @@ class GridDhtPartitionSupplier {
                 it.remove();
             }
         }
-
-        stopOldListeners();
     }
 
     /**
@@ -146,6 +136,7 @@ class GridDhtPartitionSupplier {
      *
      * @param topVer Topology version.
      */
+    @SuppressWarnings("ConstantConditions")
     public void onTopologyChanged(AffinityTopologyVersion topVer) {
         synchronized (scMap) {
             Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
@@ -179,6 +170,7 @@ class GridDhtPartitionSupplier {
      * @param idx Index.
      * @param id Node uuid.
      */
+    @SuppressWarnings("unchecked")
     public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
         assert d != null;
         assert id != null;
@@ -208,7 +200,7 @@ class GridDhtPartitionSupplier {
             log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop +
                 ", from=" + id + ", idx=" + idx + "]");
 
-        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
             d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
 
         ClusterNode node = cctx.discovery().node(id);
@@ -338,7 +330,7 @@ class GridDhtPartitionSupplier {
                                     if (!reply(node, d, s, scId))
                                         return;
 
-                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                    s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
                                         cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
                                 }
                             }
@@ -424,7 +416,7 @@ class GridDhtPartitionSupplier {
                                         if (!reply(node, d, s, scId))
                                             return;
 
-                                        s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                        s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
                                             cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
                                     }
                                 }
@@ -545,7 +537,7 @@ class GridDhtPartitionSupplier {
                                     if (!reply(node, d, s, scId))
                                         return;
 
-                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                    s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
                                         cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
                                 }
                             }
@@ -605,7 +597,7 @@ class GridDhtPartitionSupplier {
      */
     private boolean reply(ClusterNode n,
         GridDhtPartitionDemandMessage d,
-        GridDhtPartitionSupplyMessageV2 s,
+        GridDhtPartitionSupplyMessage s,
         T3<UUID, Integer, AffinityTopologyVersion> scId)
         throws IgniteCheckedException {
 
@@ -744,294 +736,6 @@ class GridDhtPartitionSupplier {
         }
     }
 
-    @Deprecated//Backward compatibility. To be removed in future.
-    public void startOldListeners() {
-        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
-            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                    processOldDemandMessage(m, id);
-                }
-            });
-        }
-    }
-
-    @Deprecated//Backward compatibility. To be removed in future.
-    public void stopOldListeners() {
-        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled())
-            cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class);
-    }
-
-    /**
-     * @param d D.
-     * @param id Id.
-     */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
-        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-            d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
-
-        ClusterNode node = cctx.node(id);
-
-        if (node == null)
-            return;
-
-        long preloadThrottle = cctx.config().getRebalanceThrottle();
-
-        boolean ack = false;
-
-        try {
-            for (int part : d.partitions()) {
-                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
-                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                    // Reply with partition of "-1" to let sender know that
-                    // this node is no longer an owner.
-                    s.missed(part);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Requested partition is not owned by local node [part=" + part +
-                            ", demander=" + id + ']');
-
-                    continue;
-                }
-
-                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
-                try {
-                    if (cctx.isSwapOrOffheapEnabled()) {
-                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
-                        cctx.swap().addOffHeapListener(part, swapLsnr);
-                        cctx.swap().addSwapListener(part, swapLsnr);
-                    }
-
-                    boolean partMissing = false;
-
-                    for (GridCacheEntryEx e : loc.allEntries()) {
-                        if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
-                            // Demander no longer needs this partition, so we send '-1' partition and move on.
-                            s.missed(part);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Demanding node does not need requested partition [part=" + part +
-                                    ", nodeId=" + id + ']');
-
-                            partMissing = true;
-
-                            break;
-                        }
-
-                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                            ack = true;
-
-                            if (!replyOld(node, d, s))
-                                return;
-
-                            // Throttle preloading.
-                            if (preloadThrottle > 0)
-                                U.sleep(preloadThrottle);
-
-                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                cctx.cacheId(), cctx.deploymentEnabled());
-                        }
-
-                        GridCacheEntryInfo info = e.info();
-
-                        if (info != null && !info.isNew()) {
-                            if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry(part, info, cctx);
-                            else if (log.isDebugEnabled())
-                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                    info);
-                        }
-                    }
-
-                    if (partMissing)
-                        continue;
-
-                    if (cctx.isSwapOrOffheapEnabled()) {
-                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
-                            cctx.swap().iterator(part);
-
-                        // Iterator may be null if space does not exist.
-                        if (iter != null) {
-                            try {
-                                boolean prepared = false;
-
-                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
-                                    if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
-                                        // Demander no longer needs this partition,
-                                        // so we send '-1' partition and move on.
-                                        s.missed(part);
-
-                                        if (log.isDebugEnabled())
-                                            log.debug("Demanding node does not need requested partition " +
-                                                "[part=" + part + ", nodeId=" + id + ']');
-
-                                        partMissing = true;
-
-                                        break; // For.
-                                    }
-
-                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                        ack = true;
-
-                                        if (!replyOld(node, d, s))
-                                            return;
-
-                                        // Throttle preloading.
-                                        if (preloadThrottle > 0)
-                                            U.sleep(preloadThrottle);
-
-                                        s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                            d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
-                                    }
-
-                                    GridCacheSwapEntry swapEntry = e.getValue();
-
-                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
-
-                                    info.keyBytes(e.getKey());
-                                    info.ttl(swapEntry.ttl());
-                                    info.expireTime(swapEntry.expireTime());
-                                    info.version(swapEntry.version());
-                                    info.value(swapEntry.value());
-
-                                    if (preloadPred == null || preloadPred.apply(info))
-                                        s.addEntry0(part, info, cctx);
-                                    else {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Rebalance predicate evaluated to false (will not send " +
-                                                "cache entry): " + info);
-
-                                        continue;
-                                    }
-
-                                    // Need to manually prepare cache message.
-                                    if (depEnabled && !prepared) {
-                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
-                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-                                            swapEntry.valueClassLoaderId() != null ?
-                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-                                                null;
-
-                                        if (ldr == null)
-                                            continue;
-
-                                        if (ldr instanceof GridDeploymentInfo) {
-                                            s.prepare((GridDeploymentInfo)ldr);
-
-                                            prepared = true;
-                                        }
-                                    }
-                                }
-
-                                if (partMissing)
-                                    continue;
-                            }
-                            finally {
-                                iter.close();
-                            }
-                        }
-                    }
-
-                    // Stop receiving promote notifications.
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
-
-                    if (swapLsnr != null) {
-                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
-                        swapLsnr = null;
-
-                        for (GridCacheEntryInfo info : entries) {
-                            if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition,
-                                // so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition " +
-                                        "[part=" + part + ", nodeId=" + id + ']');
-
-                                // No need to continue iteration over swap entries.
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
-                                if (!replyOld(node, d, s))
-                                    return;
-
-                                s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                    d.updateSequence(),
-                                    cctx.cacheId(),
-                                    cctx.deploymentEnabled());
-                            }
-
-                            if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry(part, info, cctx);
-                            else if (log.isDebugEnabled())
-                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                    info);
-                        }
-                    }
-
-                    // Mark as last supply message.
-                    s.last(part);
-
-                    if (ack) {
-                        s.markAck();
-
-                        break; // Partition for loop.
-                    }
-                }
-                finally {
-                    loc.release();
-
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
-                }
-            }
-
-            replyOld(node, d, s);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
-        }
-    }
-
-    /**
-     * @param n Node.
-     * @param d Demand message.
-     * @param s Supply message.
-     * @return {@code True} if message was sent, {@code false} if recipient left grid.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
-        throws IgniteCheckedException {
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
-            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
-            return true;
-        }
-        catch (ClusterTopologyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
-            return false;
-        }
-    }
-
     /**
      * Dumps debug information.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index cc30321..a01be28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -48,14 +49,11 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Worker ID. */
-    private int workerId = -1;
-
     /** Update sequence. */
     private long updateSeq;
 
-    /** Acknowledgement flag. */
-    private boolean ack;
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
 
     /** Partitions that have been fully sent. */
     @GridDirectCollection(int.class)
@@ -68,27 +66,26 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
     /** Entries. */
     @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
-    private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+    private Map<Integer, CacheEntryInfoCollection> infos;
 
     /** Message size. */
     @GridDirectTransient
     private int msgSize;
 
     /**
-     * @param workerId Worker ID.
      * @param updateSeq Update sequence for this node.
      * @param cacheId Cache ID.
+     * @param topVer Topology version.
      * @param addDepInfo Deployment info flag.
      */
-    GridDhtPartitionSupplyMessage(int workerId, long updateSeq, int cacheId, boolean addDepInfo) {
-        assert workerId >= 0;
-        assert updateSeq > 0;
-
+    GridDhtPartitionSupplyMessage(long updateSeq,
+        int cacheId,
+        AffinityTopologyVersion topVer,
+        boolean addDepInfo) {
         this.cacheId = cacheId;
         this.updateSeq = updateSeq;
-        this.workerId = workerId;
-        this.addDepInfo = addDepInfo;
-    }
+        this.topVer = topVer;
+        this.addDepInfo = addDepInfo;    }
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -103,13 +100,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     }
 
     /**
-     * @return Worker ID.
-     */
-    int workerId() {
-        return workerId;
-    }
-
-    /**
      * @return Update sequence.
      */
     long updateSequence() {
@@ -117,17 +107,10 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     }
 
     /**
-     * Marks this message for acknowledgment.
+     * @return Topology version for which demand message is sent.
      */
-    void markAck() {
-        ack = true;
-    }
-
-    /**
-     * @return Acknowledgement flag.
-     */
-    boolean ack() {
-        return ack;
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
     }
 
     /**
@@ -148,12 +131,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
             msgSize += 4;
 
             // If partition is empty, we need to add it.
-            if (!infos.containsKey(p)) {
+            if (!infos().containsKey(p)) {
                 CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
 
                 infoCol.init();
 
-                infos.put(p, infoCol);
+                infos().put(p, infoCol);
             }
         }
     }
@@ -180,6 +163,9 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
      * @return Entries.
      */
     Map<Integer, CacheEntryInfoCollection> infos() {
+        if (infos == null)
+            infos = new HashMap<>();
+
         return infos;
     }
 
@@ -203,12 +189,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
         msgSize += info.marshalledSize(ctx);
 
-        CacheEntryInfoCollection infoCol = infos.get(p);
+        CacheEntryInfoCollection infoCol = infos().get(p);
 
         if (infoCol == null) {
             msgSize += 4;
 
-            infos.put(p, infoCol = new CacheEntryInfoCollection());
+            infos().put(p, infoCol = new CacheEntryInfoCollection());
 
             infoCol.init();
         }
@@ -232,12 +218,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
         msgSize += info.marshalledSize(ctx);
 
-        CacheEntryInfoCollection infoCol = infos.get(p);
+        CacheEntryInfoCollection infoCol = infos().get(p);
 
         if (infoCol == null) {
             msgSize += 4;
 
-            infos.put(p, infoCol = new CacheEntryInfoCollection());
+            infos().put(p, infoCol = new CacheEntryInfoCollection());
 
             infoCol.init();
         }
@@ -253,7 +239,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
         GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
 
         for (CacheEntryInfoCollection col : infos().values()) {
-            List<GridCacheEntryInfo>  entries = col.infos();
+            List<GridCacheEntryInfo> entries = col.infos();
 
             for (int i = 0; i < entries.size(); i++)
                 entries.get(i).unmarshal(cacheCtx, ldr);
@@ -269,7 +255,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
      * @return Number of entries in message.
      */
     public int size() {
-        return infos.size();
+        return infos().size();
     }
 
     /** {@inheritDoc} */
@@ -288,25 +274,25 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeBoolean("ack", ack))
+                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
@@ -317,12 +303,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
                 writer.incrementState();
 
-            case 8:
-                if (!writer.writeInt("workerId", workerId))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -340,7 +320,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
         switch (reader.state()) {
             case 3:
-                ack = reader.readBoolean("ack");
+                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -348,7 +328,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 4:
-                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+                last = reader.readCollection("last", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -356,7 +336,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 5:
-                last = reader.readCollection("last", MessageCollectionItemType.INT);
+                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -364,7 +344,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 6:
-                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -379,14 +359,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
                 reader.incrementState();
 
-            case 8:
-                workerId = reader.readInt("workerId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(GridDhtPartitionSupplyMessage.class);
@@ -394,18 +366,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return 45;
+        return 114;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 8;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtPartitionSupplyMessage.class, this,
             "size", size(),
+            "parts", infos().keySet(),
             "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
deleted file mode 100644
index 2294582..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Partition supply message.
- */
-public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Update sequence. */
-    private long updateSeq;
-
-    /** Topology version. */
-    private AffinityTopologyVersion topVer;
-
-    /** Partitions that have been fully sent. */
-    @GridDirectCollection(int.class)
-    private Collection<Integer> last;
-
-    /** Partitions which were not found. */
-    @GridToStringInclude
-    @GridDirectCollection(int.class)
-    private Collection<Integer> missed;
-
-    /** Entries. */
-    @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
-    private Map<Integer, CacheEntryInfoCollection> infos;
-
-    /** Message size. */
-    @GridDirectTransient
-    private int msgSize;
-
-    /**
-     * @param updateSeq Update sequence for this node.
-     * @param cacheId Cache ID.
-     * @param topVer Topology version.
-     * @param addDepInfo Deployment info flag.
-     */
-    GridDhtPartitionSupplyMessageV2(long updateSeq,
-        int cacheId,
-        AffinityTopologyVersion topVer,
-        boolean addDepInfo) {
-        this.cacheId = cacheId;
-        this.updateSeq = updateSeq;
-        this.topVer = topVer;
-        this.addDepInfo = addDepInfo;    }
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtPartitionSupplyMessageV2() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean ignoreClassErrors() {
-        return true;
-    }
-
-    /**
-     * @return Update sequence.
-     */
-    long updateSequence() {
-        return updateSeq;
-    }
-
-    /**
-     * @return Topology version for which demand message is sent.
-     */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Flag to indicate last message for partition.
-     */
-    Collection<Integer> last() {
-        return last == null ? Collections.<Integer>emptySet() : last;
-    }
-
-    /**
-     * @param p Partition which was fully sent.
-     */
-    void last(int p) {
-        if (last == null)
-            last = new HashSet<>();
-
-        if (last.add(p)) {
-            msgSize += 4;
-
-            // If partition is empty, we need to add it.
-            if (!infos().containsKey(p)) {
-                CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
-
-                infoCol.init();
-
-                infos().put(p, infoCol);
-            }
-        }
-    }
-
-    /**
-     * @param p Missed partition.
-     */
-    void missed(int p) {
-        if (missed == null)
-            missed = new HashSet<>();
-
-        if (missed.add(p))
-            msgSize += 4;
-    }
-
-    /**
-     * @return Missed partitions.
-     */
-    Collection<Integer> missed() {
-        return missed == null ? Collections.<Integer>emptySet() : missed;
-    }
-
-    /**
-     * @return Entries.
-     */
-    Map<Integer, CacheEntryInfoCollection> infos() {
-        if (infos == null)
-            infos = new HashMap<>();
-
-        return infos;
-    }
-
-    /**
-     * @return Message size.
-     */
-    int messageSize() {
-        return msgSize;
-    }
-
-    /**
-     * @param p Partition.
-     * @param info Entry to add.
-     * @param ctx Cache context.
-     * @throws IgniteCheckedException If failed.
-     */
-    void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
-        assert info != null;
-
-        marshalInfo(info, ctx);
-
-        msgSize += info.marshalledSize(ctx);
-
-        CacheEntryInfoCollection infoCol = infos().get(p);
-
-        if (infoCol == null) {
-            msgSize += 4;
-
-            infos().put(p, infoCol = new CacheEntryInfoCollection());
-
-            infoCol.init();
-        }
-
-        infoCol.add(info);
-    }
-
-    /**
-     * @param p Partition.
-     * @param info Entry to add.
-     * @param ctx Cache context.
-     * @throws IgniteCheckedException If failed.
-     */
-    void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
-        assert info != null;
-        assert (info.key() != null || info.keyBytes() != null);
-        assert info.value() != null;
-
-        // Need to call this method to initialize info properly.
-        marshalInfo(info, ctx);
-
-        msgSize += info.marshalledSize(ctx);
-
-        CacheEntryInfoCollection infoCol = infos().get(p);
-
-        if (infoCol == null) {
-            msgSize += 4;
-
-            infos().put(p, infoCol = new CacheEntryInfoCollection());
-
-            infoCol.init();
-        }
-
-        infoCol.add(info);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
-
-        for (CacheEntryInfoCollection col : infos().values()) {
-            List<GridCacheEntryInfo> entries = col.infos();
-
-            for (int i = 0; i < entries.size(); i++)
-                entries.get(i).unmarshal(cacheCtx, ldr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /**
-     * @return Number of entries in message.
-     */
-    public int size() {
-        return infos().size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeMessage("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeLong("updateSeq", updateSeq))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                last = reader.readCollection("last", MessageCollectionItemType.INT);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                topVer = reader.readMessage("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                updateSeq = reader.readLong("updateSeq");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 114;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 8;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
-            "size", size(),
-            "parts", infos().keySet(),
-            "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index dc988bd..a5dcd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -187,7 +187,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         supplier = new GridDhtPartitionSupplier(cctx);
         demander = new GridDhtPartitionDemander(cctx);
 
-        supplier.start();
         demander.start();
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
@@ -380,7 +379,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessage s) {
         if (!enterBusy())
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 9cce826..8c5a72e 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -743,7 +743,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index e482a93..5582fdd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -65,7 +65,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -2016,10 +2016,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
     private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
         spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
             @Override public boolean apply(GridIoMessage ioMsg) {
-                if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class))
+                if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
                     return false;
 
-                GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message();
+                GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
 
                 return msg.cacheId() == CU.cacheId(cacheName);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
index 89fcf6b..29c2af6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
@@ -38,7 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -197,10 +197,10 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
 
                     spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
                         @Override public boolean apply(GridIoMessage ioMsg) {
-                            if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class))
+                            if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
                                 return false;
 
-                            GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message();
+                            GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
 
                             return msg.cacheId() == CU.cacheId(ccfg.getName());
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index eda030c..dfb3f65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -106,7 +105,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
                 @Override public boolean apply(GridIoMessage msg) {
                     Object msg0 = msg.message();
 
-                    return (msg0 instanceof GridDhtPartitionSupplyMessage || msg0 instanceof GridDhtPartitionSupplyMessageV2)
+                    return (msg0 instanceof GridDhtPartitionSupplyMessage)
                         && ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE);
                 }
             });


[2/2] ignite git commit: Merge branch master into ignite-3477-master

Posted by ag...@apache.org.
Merge branch master into ignite-3477-master


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb05de5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb05de5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb05de5

Branch: refs/heads/ignite-3477-master
Commit: 3eb05de5e2ee0d14567167bfe8547441cae69523
Parents: 8122099 aeacad6
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Apr 11 13:16:03 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 11 13:16:03 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../processors/cache/GridCachePreloader.java    |   4 +-
 .../cache/GridCachePreloaderAdapter.java        |   4 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |   8 +-
 .../GridDhtPartitionSupplyMessage.java          | 103 +++--
 .../GridDhtPartitionSupplyMessageV2.java        | 422 -------------------
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../resources/META-INF/classnames.properties    |   1 -
 .../cache/ClusterStateAbstractTest.java         |  10 +-
 .../CacheLateAffinityAssignmentTest.java        |   6 +-
 .../IgniteCacheReadFromBackupTest.java          |   6 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   |   3 +-
 14 files changed, 84 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b80ad04,7c2599a..f7f0aff
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -332,10 -537,8 +334,10 @@@ class GridDhtPartitionSupplier 
                                      if (!reply(node, d, s, scId))
                                          return;
  
-                                     s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                     s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
 -                                        cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
 +                                        cctx.cacheId(),
 +                                        d.topologyVersion(),
 +                                        cctx.deploymentEnabled());
                                  }
                              }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index cc30321,a01be28..ee461ab
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@@ -66,9 -64,9 +64,14 @@@ public class GridDhtPartitionSupplyMess
      @GridDirectCollection(int.class)
      private Collection<Integer> missed;
  
++    /** Partitions for which we were able to get historical iterator. */
++    @GridToStringInclude
++    @GridDirectCollection(int.class)
++    private Collection<Integer> clean;
++
      /** Entries. */
      @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
-     private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+     private Map<Integer, CacheEntryInfoCollection> infos;
  
      /** Message size. */
      @GridDirectTransient
@@@ -159,6 -142,6 +147,25 @@@
      }
  
      /**
++     * @param p Partition to clean.
++     */
++    void clean(int p) {
++        if (clean == null)
++            clean = new HashSet<>();
++
++        if (clean.add(p))
++            msgSize += 4;
++    }
++
++    /**
++     * @param p Partition to check.
++     * @return Check result.
++     */
++    boolean isClean(int p) {
++        return clean != null && clean.contains(p);
++    }
++
++    /**
       * @param p Missed partition.
       */
      void missed(int p) {
@@@ -288,7 -274,7 +298,7 @@@
  
          switch (writer.state()) {
              case 3:
-                 if (!writer.writeBoolean("ack", ack))
 -                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
++                if (!writer.writeCollection("clean", clean, MessageCollectionItemType.INT))
                      return false;
  
                  writer.incrementState();
@@@ -312,13 -298,7 +322,13 @@@
                  writer.incrementState();
  
              case 7:
-                 if (!writer.writeLong("updateSeq", updateSeq))
++                if (!writer.writeMessage("topVer", topVer))
 +                    return false;
 +
 +                writer.incrementState();
 +
 +            case 8:
-                 if (!writer.writeInt("workerId", workerId))
+                 if (!writer.writeLong("updateSeq", updateSeq))
                      return false;
  
                  writer.incrementState();
@@@ -340,7 -320,7 +350,7 @@@
  
          switch (reader.state()) {
              case 3:
-                 ack = reader.readBoolean("ack");
 -                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
++                clean = reader.readCollection("clean", MessageCollectionItemType.INT);
  
                  if (!reader.isLastRead())
                      return false;
@@@ -372,15 -352,7 +382,15 @@@
                  reader.incrementState();
  
              case 7:
-                 updateSeq = reader.readLong("updateSeq");
++                topVer = reader.readMessage("topVer");
 +
 +                if (!reader.isLastRead())
 +                    return false;
 +
 +                reader.incrementState();
 +
 +            case 8:
-                 workerId = reader.readInt("workerId");
+                 updateSeq = reader.readLong("updateSeq");
  
                  if (!reader.isLastRead())
                      return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --cc modules/core/src/main/resources/META-INF/classnames.properties
index 473f176,8c5a72e..335a33f
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@@ -763,9 -740,9 +763,8 @@@ org.apache.ignite.internal.processors.c
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap
 -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
- org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
index f095e79,0000000..ce7829a
mode 100644,000000..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
@@@ -1,439 -1,0 +1,439 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +
 +package org.apache.ignite.internal.processors.cache;
 +
 +import java.util.Collection;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.locks.Lock;
 +import org.apache.ignite.Ignite;
 +import org.apache.ignite.IgniteCache;
 +import org.apache.ignite.IgniteException;
 +import org.apache.ignite.cluster.ClusterNode;
 +import org.apache.ignite.configuration.CacheConfiguration;
 +import org.apache.ignite.configuration.IgniteConfiguration;
 +import org.apache.ignite.internal.IgniteEx;
 +import org.apache.ignite.internal.IgniteInternalFuture;
 +import org.apache.ignite.internal.managers.communication.GridIoMessage;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
- import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
++import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 +import org.apache.ignite.internal.util.GridConcurrentHashSet;
 +import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteInClosure;
 +import org.apache.ignite.plugin.extensions.communication.Message;
 +import org.apache.ignite.spi.IgniteSpiException;
 +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 +import org.apache.ignite.testframework.GridTestUtils;
 +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 +import org.apache.ignite.transactions.Transaction;
 +
 +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 +
 +/**
 + *
 + */
 +@SuppressWarnings("TooBroadScope")
 +public abstract class ClusterStateAbstractTest extends GridCommonAbstractTest {
 +    /** Entry count. */
 +    public static final int ENTRY_CNT = 5000;
 +
 +    /** */
 +    public static final int GRID_CNT = 4;
 +
 +    /** */
 +    private static final String CACHE_NAME = "cache1";
 +
 +    /** */
 +    private static final Collection<Class> forbidden = new GridConcurrentHashSet<>();
 +
 +    /** */
 +    private static AtomicReference<Exception> errEncountered = new AtomicReference<>();
 +
 +    /** */
 +    private boolean activeOnStart = true;
 +
 +    /** */
 +    private boolean client;
 +
 +    /** {@inheritDoc} */
 +    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
 +        IgniteConfiguration cfg = super.getConfiguration(gridName);
 +
 +        cfg.setActiveOnStart(activeOnStart);
 +
 +        cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME));
 +
 +        if (client)
 +            cfg.setClientMode(true);
 +
 +        cfg.setCommunicationSpi(new TestCommunicationSpi());
 +
 +        return cfg;
 +    }
 +
 +    /**
 +     * @param cacheName Cache name.
 +     * @return Cache configuration.
 +     */
 +    protected abstract CacheConfiguration cacheConfiguration(String cacheName);
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        stopAllGrids();
 +
 +        forbidden.clear();
 +
 +        Exception err = errEncountered.getAndSet(null);
 +
 +        if (err != null)
 +            throw err;
 +    }
 +
 +    /**
 +     * @throws Exception if failed.
 +     */
 +    public void testDynamicCacheStart() throws Exception {
 +        activeOnStart = false;
 +
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        startGrids(GRID_CNT);
 +
 +        checkInactive(GRID_CNT);
 +
 +        forbidden.clear();
 +
 +        grid(0).active(true);
 +
 +        IgniteCache<Object, Object> cache2 = grid(0).createCache(new CacheConfiguration<>("cache2"));
 +
 +        for (int k = 0; k < ENTRY_CNT; k++)
 +            cache2.put(k, k);
 +
 +        grid(0).active(false);
 +
 +        checkInactive(GRID_CNT);
 +
 +        stopAllGrids();
 +    }
 +
 +    /**
 +     * @throws Exception if failed.
 +     */
 +    public void testNoRebalancing() throws Exception {
 +        activeOnStart = false;
 +
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        startGrids(GRID_CNT);
 +
 +        checkInactive(GRID_CNT);
 +
 +        forbidden.clear();
 +
 +        grid(0).active(true);
 +
 +        awaitPartitionMapExchange();
 +
 +        final IgniteCache<Object, Object> cache = grid(0).cache(CACHE_NAME);
 +
 +        for (int k = 0; k < ENTRY_CNT; k++)
 +            cache.put(k, k);
 +
 +        for (int g = 0; g < GRID_CNT; g++) {
 +            // Tests that state changes are propagated to existing and new nodes.
 +            assertTrue(grid(g).active());
 +
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals(k,  cache0.get(k));
 +        }
 +
 +        // Check that new node startup and shutdown works fine after activation.
 +        startGrid(GRID_CNT);
 +        startGrid(GRID_CNT + 1);
 +
 +        for (int g = 0; g < GRID_CNT + 2; g++) {
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals("Failed for [grid=" + g + ", key=" + k + ']', k, cache0.get(k));
 +        }
 +
 +        stopGrid(GRID_CNT + 1);
 +
 +        for (int g = 0; g < GRID_CNT + 1; g++)
 +            grid(g).cache(CACHE_NAME).rebalance().get();
 +
 +        stopGrid(GRID_CNT);
 +
 +        for (int g = 0; g < GRID_CNT; g++) {
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals(k,  cache0.get(k));
 +        }
 +
 +        grid(0).active(false);
 +
 +        GridTestUtils.waitForCondition(new GridAbsPredicate() {
 +            @Override public boolean apply() {
 +                for (int g = 0; g < GRID_CNT; g++) {
 +                    if (grid(g).active())
 +                        return false;
 +                }
 +
 +                return true;
 +            }
 +        }, 5000);
 +
 +        checkInactive(GRID_CNT);
 +
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        // Should stop without exchange.
 +        stopAllGrids();
 +    }
 +
 +    /**
 +     * @throws Exception if failed.
 +     */
 +    public void testActivationFromClient() throws Exception {
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        activeOnStart = false;
 +
 +        startGrids(GRID_CNT);
 +
 +        client = true;
 +
 +        startGrid(GRID_CNT);
 +
 +        checkInactive(GRID_CNT + 1);
 +
 +        Ignite cl = grid(GRID_CNT);
 +
 +        forbidden.clear();
 +
 +        cl.active(true);
 +
 +        awaitPartitionMapExchange();
 +
 +        IgniteCache<Object, Object> cache = cl.cache(CACHE_NAME);
 +
 +        for (int k = 0; k < ENTRY_CNT; k++)
 +            cache.put(k, k);
 +
 +        for (int g = 0; g < GRID_CNT + 1; g++) {
 +            // Tests that state changes are propagated to existing and new nodes.
 +            assertTrue(grid(g).active());
 +
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals(k,  cache0.get(k));
 +        }
 +
 +        cl.active(false);
 +
 +        GridTestUtils.waitForCondition(new GridAbsPredicate() {
 +            @Override public boolean apply() {
 +                for (int g = 0; g < GRID_CNT + 1; g++) {
 +                    if (grid(g).active())
 +                        return false;
 +                }
 +
 +                return true;
 +            }
 +        }, 5000);
 +
 +        checkInactive(GRID_CNT + 1);
 +    }
 +
 +    /**
 +     * Tests that state doesn't change until all acquired locks are released.
 +     *
 +     * @throws Exception If fails.
 +     */
 +    public void testDeactivationWithPendingLock() throws Exception {
 +        fail("https://issues.apache.org/jira/browse/IGNITE-4931");
 +
 +        startGrids(GRID_CNT);
 +
 +        final CountDownLatch finishedLatch = new CountDownLatch(1);
 +
 +        Lock lock = grid(0).cache(CACHE_NAME).lock(1);
 +
 +        IgniteInternalFuture<?> fut;
 +
 +        lock.lock();
 +
 +        try {
 +            fut = multithreadedAsync(new Runnable() {
 +                @Override public void run() {
 +                    grid(1).active(false);
 +
 +                    finishedLatch.countDown();
 +                }
 +            }, 1);
 +
 +            U.sleep(2000);
 +
 +            assert !fut.isDone();
 +
 +            boolean hasActive = false;
 +
 +            for (int g = 0; g < GRID_CNT; g++) {
 +                IgniteEx grid = grid(g);
 +
 +                if (grid.active()) {
 +                    hasActive = true;
 +
 +                    break;
 +                }
 +
 +            }
 +
 +            assertTrue(hasActive);
 +        }
 +        finally {
 +            lock.unlock();
 +        }
 +
 +        fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
 +
 +        checkInactive(GRID_CNT);
 +
 +        finishedLatch.await();
 +    }
 +
 +    /**
 +     * Tests that state doesn't change until all pending transactions are finished.
 +     *
 +     * @throws Exception If fails.
 +     */
 +    public void testDeactivationWithPendingTransaction() throws Exception {
 +        fail("https://issues.apache.org/jira/browse/IGNITE-4931");
 +
 +        startGrids(GRID_CNT);
 +
 +        final CountDownLatch finishedLatch = new CountDownLatch(1);
 +
 +        final Ignite ignite0 = grid(0);
 +
 +        final IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME);
 +
 +        IgniteInternalFuture<?> fut;
 +
 +        try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
 +            cache0.get(1);
 +
 +            fut = multithreadedAsync(new Runnable() {
 +                @Override public void run() {
 +                    ignite0.active(false);
 +
 +                    finishedLatch.countDown();
 +                }
 +            }, 1);
 +
 +            U.sleep(2000);
 +
 +            assert !fut.isDone();
 +
 +            boolean hasActive = false;
 +
 +            for (int g = 0; g < GRID_CNT; g++) {
 +                IgniteEx grid = grid(g);
 +
 +                if (grid.active()) {
 +                    hasActive = true;
 +
 +                    break;
 +                }
 +
 +            }
 +
 +            assertTrue(hasActive);
 +
 +            cache0.put(1, 2);
 +
 +            tx.commit();
 +        }
 +
 +        fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
 +
 +        checkInactive(GRID_CNT);
 +
 +        ignite0.active(true);
 +
 +        for (int g = 0; g < GRID_CNT; g++)
 +            assertEquals(2, grid(g).cache(CACHE_NAME).get(1));
 +
 +        finishedLatch.await();
 +    }
 +
 +    /**
 +     *
 +     */
 +    private void checkInactive(int cnt) {
 +        for (int g = 0; g < cnt; g++)
 +            assertFalse(grid(g).active());
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class TestCommunicationSpi extends TcpCommunicationSpi {
 +        /** {@inheritDoc} */
 +        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
 +            checkForbidden((GridIoMessage)msg);
 +
 +            super.sendMessage(node, msg, ackC);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
 +            checkForbidden((GridIoMessage)msg);
 +
 +            super.sendMessage(node, msg);
 +        }
 +
 +        /**
 +         * @param msg Message to check.
 +         */
 +        private void checkForbidden(GridIoMessage msg) {
 +            if (forbidden.contains(msg.message().getClass())) {
 +                IgniteSpiException err = new IgniteSpiException("Message is forbidden for this test: " + msg.message());
 +
 +                // Set error in case if this exception is not visible to the user code.
 +                errEncountered.compareAndSet(null, err);
 +
 +                throw err;
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------