You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:22:32 UTC

[09/10] ignite git commit: ignite-6124 Merge exchanges for multiple discovery events

ignite-6124 Merge exchanges for multiple discovery events


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bebf2997
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bebf2997
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bebf2997

Branch: refs/heads/master
Commit: bebf299799712b464ee0e3800752ecc07770d9f0
Parents: b8b8064
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 21 13:21:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 21 13:21:45 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |    3 +
 .../java/org/apache/ignite/TestDebugLog.java    |  219 ++
 .../internal/IgniteDiagnosticMessage.java       |    2 +-
 .../communication/GridIoMessageFactory.java     |    9 +-
 .../internal/managers/discovery/DiscoCache.java |   79 +-
 .../discovery/GridDiscoveryManager.java         |   28 +-
 .../affinity/AffinityTopologyVersion.java       |    7 +
 .../affinity/GridAffinityAssignmentCache.java   |   42 +
 .../affinity/GridAffinityProcessor.java         |    8 +-
 .../cache/CacheAffinitySharedManager.java       |  695 ++++--
 .../processors/cache/CacheGroupContext.java     |   18 +-
 .../cache/CachePartitionExchangeWorkerTask.java |    5 +-
 .../ClientCacheChangeDummyDiscoveryMessage.java |    5 +
 .../cache/ClientCacheUpdateTimeout.java         |    5 +
 .../processors/cache/ClusterCachesInfo.java     |   22 +-
 .../processors/cache/ExchangeContext.java       |  131 ++
 .../cache/ExchangeDiscoveryEvents.java          |  262 +++
 .../processors/cache/GridCacheAdapter.java      |    8 +-
 .../processors/cache/GridCacheContext.java      |    2 +-
 .../processors/cache/GridCacheIoManager.java    |   57 +-
 .../processors/cache/GridCacheMapEntry.java     |    6 +-
 .../GridCachePartitionExchangeManager.java      |  413 +++-
 .../processors/cache/GridCacheProcessor.java    |   14 +-
 .../dht/ClientCacheDhtTopologyFuture.java       |   12 +-
 .../dht/GridClientPartitionTopology.java        |  130 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   18 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    4 +-
 .../dht/GridDhtPartitionTopology.java           |   37 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  492 +++--
 .../dht/GridDhtPartitionsReservation.java       |    2 +-
 .../distributed/dht/GridDhtTopologyFuture.java  |   36 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   95 +-
 .../dht/GridPartitionedGetFuture.java           |    4 +-
 .../dht/GridPartitionedSingleGetFuture.java     |    4 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |    2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   31 +-
 .../GridNearAtomicSingleUpdateFuture.java       |    1 -
 .../colocated/GridDhtColocatedLockFuture.java   |    2 +-
 .../preloader/CacheGroupAffinityMessage.java    |  339 +++
 .../preloader/ForceRebalanceExchangeTask.java   |    5 +
 .../preloader/GridDhtPartitionExchangeId.java   |   11 +
 .../dht/preloader/GridDhtPartitionMap.java      |    2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |    2 +-
 .../GridDhtPartitionsAbstractMessage.java       |   37 +-
 .../GridDhtPartitionsExchangeFuture.java        | 1976 +++++++++++++-----
 .../preloader/GridDhtPartitionsFullMessage.java |  161 +-
 .../GridDhtPartitionsSingleMessage.java         |   78 +-
 .../GridDhtPartitionsSingleRequest.java         |   47 +-
 .../dht/preloader/GridDhtPreloader.java         |   44 +-
 .../IgniteDhtPartitionCountersMap.java          |    7 +
 .../dht/preloader/InitNewCoordinatorFuture.java |  307 +++
 .../RebalanceReassignExchangeTask.java          |    5 +
 .../distributed/near/GridNearCacheAdapter.java  |    2 +-
 .../distributed/near/GridNearGetFuture.java     |    4 +-
 .../distributed/near/GridNearLockFuture.java    |    2 +-
 ...arOptimisticSerializableTxPrepareFuture.java |    1 +
 .../near/GridNearOptimisticTxPrepareFuture.java |    1 +
 .../GridNearPessimisticTxPrepareFuture.java     |    1 +
 .../near/GridNearTxPrepareRequest.java          |   14 +
 .../GridCacheDatabaseSharedManager.java         |    6 +-
 .../cache/query/GridCacheQueryAdapter.java      |    4 +-
 .../cache/transactions/IgniteTxAdapter.java     |    2 +-
 .../cache/transactions/IgniteTxHandler.java     |  184 +-
 .../closure/GridClosureProcessor.java           |   36 +-
 .../cluster/GridClusterStateProcessor.java      |    2 +-
 .../datastreamer/DataStreamProcessor.java       |   57 +-
 .../datastreamer/DataStreamerImpl.java          |   65 +-
 .../datastreamer/PlatformDataStreamer.java      |    3 +-
 .../query/schema/SchemaExchangeWorkerTask.java  |    5 +
 .../SchemaNodeLeaveExchangeWorkerTask.java      |    5 +
 .../processors/task/GridTaskWorker.java         |    8 +-
 .../org/apache/ignite/thread/IgniteThread.java  |    9 +
 .../internal/TestDelayingCommunicationSpi.java  |   63 +
 ...CacheExchangeMessageDuplicatedStateTest.java |    9 +-
 .../IgniteClientCacheStartFailoverTest.java     |    4 +-
 .../IgniteClusterActivateDeactivateTest.java    |    4 +-
 .../cache/IgniteDynamicCacheStartSelfTest.java  |   26 +-
 ...niteTopologyValidatorGridSplitCacheTest.java |    6 +-
 ...AffinityCoordinatorDynamicStartStopTest.java |    2 +-
 ...eAbstractDataStructuresFailoverSelfTest.java |    7 +-
 .../distributed/CacheExchangeMergeTest.java     | 1528 ++++++++++++++
 .../CacheLateAffinityAssignmentTest.java        |  598 ++++--
 ...CacheLoadingConcurrentGridStartSelfTest.java |   11 +
 .../CacheLockReleaseNodeLeaveTest.java          |   13 +-
 .../distributed/CachePartitionStateTest.java    |   18 +-
 ...ncurrentGridStartSelfTestAllowOverwrite.java |   33 +
 ...niteCacheClientNodeChangingTopologyTest.java |    5 +-
 ...teCacheClientNodePartitionsExchangeTest.java |   52 +-
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |    4 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   36 +-
 .../IgniteChangeGlobalStateTest.java            |   11 +-
 .../join/JoinInActiveNodeToActiveCluster.java   |    4 +-
 .../junits/common/GridCommonAbstractTest.java   |   22 +-
 .../testsuites/IgniteCacheTestSuite2.java       |    7 +-
 .../testsuites/IgniteCacheTestSuite6.java       |    3 +
 .../cache/WaitMapExchangeFinishCallable.java    |    4 +-
 96 files changed, 7339 insertions(+), 1473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 8af66c4..39c19fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -168,6 +168,9 @@ public final class IgniteSystemProperties {
     /** Maximum size for exchange history. Default value is {@code 1000}.*/
     public static final String IGNITE_EXCHANGE_HISTORY_SIZE = "IGNITE_EXCHANGE_HISTORY_SIZE";
 
+    /** */
+    public static final String IGNITE_EXCHANGE_MERGE_DELAY = "IGNITE_EXCHANGE_MERGE_DELAY";
+
     /**
      * Name of the system property defining name of command line program.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java b/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java
new file mode 100644
index 0000000..94c5eb2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * TODO
+ */
+public class TestDebugLog {
+    /** */
+    private static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(100_000));
+
+    /** */
+    private static final SimpleDateFormat DEBUG_DATE_FMT = new SimpleDateFormat("HH:mm:ss,SSS");
+
+    static class Message {
+        String thread = Thread.currentThread().getName();
+
+        String msg;
+
+        long ts = U.currentTimeMillis();
+
+        public Message(String msg) {
+            this.msg = msg;
+        }
+
+        public String toString() {
+            return "Msg [msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+        }
+    }
+
+    static class EntryMessage extends Message {
+        Object key;
+        Object val;
+
+        public EntryMessage(Object key, Object val, String msg) {
+            super(msg);
+
+            this.key = key;
+            this.val = val;
+        }
+
+        public String toString() {
+            return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+        }
+    }
+
+    static class PartMessage extends Message {
+        int p;
+        Object val;
+
+        public PartMessage(int p, Object val, String msg) {
+            super(msg);
+
+            this.p = p;
+            this.val = val;
+        }
+
+        public String toString() {
+            return "PartMessage [p=" + p + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+        }
+    }
+
+    static final boolean out = false;
+
+    public static void addMessage(String msg) {
+        msgs.add(new Message(msg));
+
+        if (out)
+            System.out.println(msg);
+    }
+
+    public static void addEntryMessage(Object key, Object val, String msg) {
+        if (key instanceof KeyCacheObject)
+            key = ((KeyCacheObject)key).value(null, false);
+
+        EntryMessage msg0 = new EntryMessage(key, val, msg);
+
+        msgs.add(msg0);
+
+        if (out) {
+            System.out.println(msg0.toString());
+
+            System.out.flush();
+        }
+    }
+
+    public static void addPartMessage(int p, Object val, String msg) {
+        PartMessage msg0 = new PartMessage(p, val, msg);
+
+        msgs.add(msg0);
+
+        if (out) {
+            System.out.println(msg0.toString());
+
+            System.out.flush();
+        }
+    }
+
+    public static void printMessages(boolean file, Integer part) {
+        List<Object> msgs0;
+
+        synchronized (msgs) {
+            msgs0 = new ArrayList<>(msgs);
+
+            msgs.clear();
+        }
+
+        if (file) {
+            try {
+                FileOutputStream out = new FileOutputStream("test_debug.log");
+
+                PrintWriter w = new PrintWriter(out);
+
+                for (Object msg : msgs0) {
+                    if (part != null && msg instanceof PartMessage) {
+                        if (((PartMessage) msg).p != part)
+                            continue;
+                    }
+
+                    w.println(msg.toString());
+                }
+
+                w.close();
+
+                out.close();
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            for (Object msg : msgs0)
+                System.out.println(msg);
+        }
+    }
+
+    public static void printKeyMessages(boolean file, Object key) {
+        List<Object> msgs0;
+
+        synchronized (msgs) {
+            msgs0 = new ArrayList<>(msgs);
+
+            msgs.clear();
+        }
+
+        if (file) {
+            try {
+                FileOutputStream out = new FileOutputStream("test_debug.log");
+
+                PrintWriter w = new PrintWriter(out);
+
+                for (Object msg : msgs0) {
+                    if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+                        continue;
+
+                    w.println(msg.toString());
+                }
+
+                w.close();
+
+                out.close();
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            for (Object msg : msgs0) {
+                if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+                    continue;
+
+                System.out.println(msg);
+            }
+        }
+    }
+
+    public static void clear() {
+        msgs.clear();
+    }
+
+    public static void clearEntries() {
+        for (Iterator it = msgs.iterator(); it.hasNext();) {
+            Object msg = it.next();
+
+            if (msg instanceof EntryMessage)
+                it.remove();
+        }
+    }
+
+    public static void main(String[] args) {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
index bd4ec3a..8739c0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -364,7 +364,7 @@ public class IgniteDiagnosticMessage implements Message {
             List<GridDhtPartitionsExchangeFuture> futs = ctx.cache().context().exchange().exchangeFutures();
 
             for (GridDhtPartitionsExchangeFuture fut : futs) {
-                if (topVer.equals(fut.topologyVersion())) {
+                if (topVer.equals(fut.initialVersion())) {
                     sb.append("Exchange future: ").append(fut);
 
                     return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 29c89a5..97e06bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -869,7 +870,13 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124..127] [-23..-27] [-36..-55]- this
+            case 128:
+                msg = new CacheGroupAffinityMessage();
+
+                break;
+
+
+            // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             // [2048..2053] - Snapshots

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4c1077b..5ac99f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -81,7 +83,14 @@ public class DiscoCache {
     /** Alive nodes. */
     private final Set<UUID> alives = new GridConcurrentHashSet<>();
 
+    /** */
+    private final IgniteProductVersion minNodeVer;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
     /**
+     * @param topVer Topology version.
      * @param state Current cluster state.
      * @param loc Local node.
      * @param rmtNodes Remote nodes.
@@ -97,6 +106,7 @@ public class DiscoCache {
      * @param alives Alive nodes.
      */
     DiscoCache(
+        AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
         List<ClusterNode> rmtNodes,
@@ -110,6 +120,7 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
         Set<UUID> alives) {
+        this.topVer = topVer;
         this.state = state;
         this.loc = loc;
         this.rmtNodes = rmtNodes;
@@ -123,6 +134,33 @@ public class DiscoCache {
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
         this.alives.addAll(alives);
+
+        IgniteProductVersion minVer = null;
+
+        for (int i = 0; i < allNodes.size(); i++) {
+            ClusterNode node = allNodes.get(i);
+
+            if (minVer == null)
+                minVer = node.version();
+            else if (node.version().compareTo(minVer) < 0)
+                minVer = node.version();
+        }
+
+        minNodeVer = minVer;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion version() {
+        return topVer;
+    }
+
+    /**
+     * @return Minimum node version.
+     */
+    public IgniteProductVersion minimumNodeVersion() {
+        return minNodeVer;
     }
 
     /**
@@ -254,7 +292,7 @@ public class DiscoCache {
      * @param id Node ID.
      * @return Node.
      */
-    public @Nullable ClusterNode node(UUID id) {
+    @Nullable public ClusterNode node(UUID id) {
         return nodeMap.get(id);
     }
 
@@ -280,6 +318,45 @@ public class DiscoCache {
     }
 
     /**
+     * @param order Order.
+     * @return Server node instance.
+     */
+    @Nullable public ClusterNode serverNodeByOrder(long order) {
+        int idx = serverNodeBinarySearch(order);
+
+        if (idx >= 0)
+            return srvNodes.get(idx);
+
+        return null;
+    }
+
+    /**
+     * @param order Node order.
+     * @return Node index.
+     */
+    private int serverNodeBinarySearch(long order) {
+        int low = 0;
+        int high = srvNodes.size() - 1;
+
+        while (low <= high) {
+            int mid = (low + high) >>> 1;
+
+            ClusterNode midVal = srvNodes.get(mid);
+
+            int cmp = Long.compare(midVal.order(), order);
+
+            if (cmp < 0)
+                low = mid + 1;
+            else if (cmp > 0)
+                high = mid - 1;
+            else
+                return mid;
+        }
+
+        return -(low + 1);
+    }
+
+    /**
      * @param nodes Cluster nodes.
      * @return Empty collection if nodes list is {@code null}
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 834ba4d..d426ca5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -621,7 +621,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 ChangeGlobalStateFinishMessage stateFinishMsg = null;
 
                 if (locJoinEvt) {
-                    discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                    discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer),
+                        ctx.state().clusterState(),
+                        locNode,
+                        topSnapshot);
 
                     transitionWaitFut = ctx.state().onLocalJoin(discoCache);
                 }
@@ -644,7 +647,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
                         ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
 
-                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                        discoCache = createDiscoCache(topSnap.get().topVer,
+                            ctx.state().clusterState(),
+                            locNode,
+                            topSnapshot);
 
                         topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
 
@@ -691,8 +697,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 // There is no race possible between history maintenance and concurrent discovery
                 // event notifications, since SPI notifies manager about all events from this listener.
                 if (verChanged) {
-                    if (discoCache == null)
-                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                    if (discoCache == null) {
+                        discoCache = createDiscoCache(nextTopVer,
+                            ctx.state().clusterState(),
+                            locNode,
+                            topSnapshot);
+                    }
 
                     discoCacheHist.put(nextTopVer, discoCache);
 
@@ -763,7 +773,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     topHist.clear();
 
                     topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                        createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
+                        createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -2149,12 +2159,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Called from discovery thread.
      *
+     * @param topVer Topology version.
      * @param state Current state.
      * @param loc Local node.
      * @param topSnapshot Topology snapshot.
      * @return Newly created discovery cache.
      */
-    @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state,
+    @NotNull private DiscoCache createDiscoCache(
+        AffinityTopologyVersion topVer,
+        DiscoveryDataClusterState state,
         ClusterNode loc,
         Collection<ClusterNode> topSnapshot) {
         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
@@ -2231,6 +2244,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         return new DiscoCache(
+            topVer,
             state,
             loc,
             Collections.unmodifiableList(rmtNodes),
@@ -2373,7 +2387,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED,
                             AffinityTopologyVersion.NONE,
                             node,
-                            createDiscoCache(null, node, empty),
+                            createDiscoCache(AffinityTopologyVersion.NONE, null, node, empty),
                             empty,
                             null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 8669530..44b2753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -73,6 +73,13 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
     }
 
     /**
+     * @return {@code True} if this is real topology version (neither {@link #NONE} nor {@link #ZERO}.
+     */
+    public boolean initialized() {
+        return topVer > 0;
+    }
+
+    /**
      * @return Topology version with incremented minor version.
      */
     public AffinityTopologyVersion nextMinorVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a8c6c59..f921251 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -352,6 +353,17 @@ public class GridAffinityAssignmentCache {
 
         return aff.assignment();
     }
+    /**
+     * @param topVer Topology version.
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> readyAssignments(AffinityTopologyVersion topVer) {
+        AffinityAssignment aff = readyAffinity(topVer);
+
+        assert aff != null : "No ready affinity [grp=" + cacheOrGrpName + ", ver=" + topVer + ']';
+
+        return aff.assignment();
+    }
 
     /**
      * Gets future that will be completed after topology with version {@code topVer} is calculated.
@@ -456,6 +468,30 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @param topVer Topology version.
+     * @return Assignment.
+     */
+    public AffinityAssignment readyAffinity(AffinityTopologyVersion topVer) {
+        AffinityAssignment cache = head.get();
+
+        if (!cache.topologyVersion().equals(topVer)) {
+            cache = affCache.get(topVer);
+
+            if (cache == null) {
+                throw new IllegalStateException("Affinity for topology version is " +
+                    "not initialized [locNode=" + ctx.discovery().localNode().id() +
+                    ", grp=" + cacheOrGrpName +
+                    ", topVer=" + topVer +
+                    ", head=" + head.get().topologyVersion() +
+                    ", history=" + affCache.keySet() +
+                    ']');
+            }
+        }
+
+        return cache;
+    }
+
+    /**
      * Get cached affinity for specified topology version.
      *
      * @param topVer Topology version.
@@ -600,6 +636,12 @@ public class GridAffinityAssignmentCache {
         }
     }
 
+    /**
+     * @return All initialized versions.
+     */
+    public Collection<AffinityTopologyVersion> cachedVersions() {
+        return affCache.keySet();
+    }
 
     /**
      * Affinity ready future. Will remove itself from ready futures map.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 4ee0502..9c9fb8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -184,7 +184,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         assert cacheName != null;
 
         if (aff == null) {
-            aff = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
+            aff = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion());
 
             if (aff == null)
                 throw new IgniteCheckedException("Failed to get cache affinity (cache was not started " +
@@ -303,7 +303,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         if (key == null)
             return null;
 
-        AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
+        AffinityInfo affInfo = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion());
 
         if (affInfo == null)
             return null;
@@ -329,7 +329,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      */
     private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName,
         Collection<? extends K> keys) throws IgniteCheckedException {
-        return keysToNodes(cacheName, keys, ctx.discovery().topologyVersionEx());
+        return keysToNodes(cacheName, keys, ctx.cache().context().exchange().readyAffinityVersion());
     }
 
     /**
@@ -974,7 +974,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
          * @throws IgniteCheckedException If failed.
          */
         private AffinityInfo cache() throws IgniteCheckedException {
-            AffinityInfo aff = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
+            AffinityInfo aff = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion());
 
             if (aff == null)
                 throw new IgniteException("Failed to find cache (cache was not started " +