You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/11 14:42:33 UTC

[4/7] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d24b08b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d24b08b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d24b08b6

Branch: refs/heads/ignite-5075
Commit: d24b08b65aeee089894b62fa8619d0c24cbae23b
Parents: 861b34b
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 16:18:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 16:18:32 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 33 +++++++------
 .../internal/processors/cache/CacheData.java    | 14 ------
 .../processors/cache/ClusterCachesInfo.java     | 51 ++++++++------------
 .../cache/DynamicCacheDescriptor.java           | 31 +++---------
 .../processors/cache/GridCacheContext.java      | 15 ------
 .../processors/cache/GridCacheIoManager.java    |  6 ++-
 .../GridCachePartitionExchangeManager.java      |  4 +-
 .../processors/cache/GridCacheProcessor.java    |  9 ----
 .../dht/GridDhtAffinityAssignmentRequest.java   | 32 ++++++------
 .../dht/GridDhtAffinityAssignmentResponse.java  | 36 ++++++++++++--
 .../dht/GridDhtAssignmentFetchFuture.java       | 50 ++++++++++---------
 .../dht/GridDhtPartitionTopologyImpl.java       |  4 +-
 .../dht/preloader/GridDhtPreloader.java         |  4 +-
 .../continuous/CacheContinuousQueryManager.java |  1 -
 .../loadtests/hashmap/GridCacheTestContext.java |  1 -
 15 files changed, 131 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 8c275e0..bd41ccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -91,8 +91,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final Object mux = new Object();
 
     /** Pending affinity assignment futures. */
-    private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
-        pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
+        new ConcurrentHashMap8<>();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -118,6 +118,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @return Cache start topology version.
+     */
+    public AffinityTopologyVersion localStartVersion(int cacheId) {
+        DynamicCacheDescriptor desc = registeredCaches.get(cacheId);
+
+        return desc != null ? desc.localStartVersion() : null;
+    }
+
+    /**
      * Callback invoked from discovery thread when discovery message is received.
      *
      * @param type Event type.
@@ -414,8 +424,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     if (clientCacheStarted)
                         initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
                     else if (!req.clientStartOnly()) {
-                        assert fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion());
-
                         GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
 
                         assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
@@ -696,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to add.
      */
     public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut);
+        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
 
         assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut +
             ", allFuts=" + pendingAssignmentFetchFuts + ']';
@@ -706,9 +714,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to remove.
      */
     public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut);
+        boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut);
 
-        assert rmv : "Failed to remove assignment fetch future: " + fut.key();
+        assert rmv : "Failed to remove assignment fetch future: " + fut.id();
     }
 
     /**
@@ -720,13 +728,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
 
-        for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) {
-            if (fut.key().get1().equals(cacheId)) {
-                fut.onResponse(nodeId, res);
+        GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId());
 
-                break;
-            }
-        }
+        if (fut != null)
+            fut.onResponse(nodeId, res);
     }
 
     /**
@@ -1001,7 +1006,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         for (int i = 0; i < fetchFuts.size(); i++) {
             GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
 
-            Integer cacheId = fetchFut.key().get1();
+            Integer cacheId = fetchFut.cacheId();
 
             fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 82afdc7..0c97ab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -42,9 +42,6 @@ public class CacheData implements Serializable {
     private final CacheType cacheType;
 
     /** */
-    private final AffinityTopologyVersion startTopVer;
-
-    /** */
     private final IgniteUuid deploymentId;
 
     /** */
@@ -66,7 +63,6 @@ public class CacheData implements Serializable {
      * @param cacheCfg Cache configuration.
      * @param cacheId Cache ID.
      * @param cacheType Cache ID.
-     * @param startTopVer Topology version when cache was started.
      * @param deploymentId Cache deployment ID.
      * @param schema Query schema.
      * @param rcvdFrom Node ID cache was started from.
@@ -77,7 +73,6 @@ public class CacheData implements Serializable {
     CacheData(CacheConfiguration cacheCfg,
         int cacheId,
         CacheType cacheType,
-        AffinityTopologyVersion startTopVer,
         IgniteUuid deploymentId,
         QuerySchema schema,
         UUID rcvdFrom,
@@ -86,14 +81,12 @@ public class CacheData implements Serializable {
         byte flags) {
         assert cacheCfg != null;
         assert rcvdFrom != null : cacheCfg.getName();
-        assert startTopVer != null : cacheCfg.getName();
         assert deploymentId != null : cacheCfg.getName();
         assert template || cacheId != 0 : cacheCfg.getName();
 
         this.cacheCfg = cacheCfg;
         this.cacheId = cacheId;
         this.cacheType = cacheType;
-        this.startTopVer = startTopVer;
         this.deploymentId = deploymentId;
         this.schema = schema;
         this.rcvdFrom = rcvdFrom;
@@ -110,13 +103,6 @@ public class CacheData implements Serializable {
     }
 
     /**
-     * @return Start topology version.
-     */
-    public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
-    /**
      * @return {@code True} if this is template configuration.
      */
     public boolean template() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 44d41f8..efcf6a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -82,9 +82,6 @@ class ClusterCachesInfo {
     private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
 
     /** */
-    private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
-
-    /** */
     private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
 
     /**
@@ -340,19 +337,15 @@ class ClusterCachesInfo {
                     if (needExchange) {
                         req.clientStartOnly(true);
 
-                        desc.clientCacheStartVersion(topVer.nextMinorVersion());
+                        desc.localStartVersion(topVer.nextMinorVersion());
 
                         exchangeActions.addClientCacheToStart(req, desc);
                     }
                 }
 
                 if (!needExchange) {
-                    if (desc != null) {
-                        if (desc.clientCacheStartVersion() != null)
-                            waitTopVer = desc.clientCacheStartVersion();
-                        else
-                            waitTopVer = desc.startTopologyVersion();
-                    }
+                    if (desc != null)
+                        waitTopVer = desc.localStartVersion();
                 }
             }
             else if (req.globalStateChange())
@@ -404,7 +397,7 @@ class ClusterCachesInfo {
             for (DynamicCacheDescriptor desc : addedDescs) {
                 assert desc.template() || incMinorTopVer;
 
-                desc.startTopologyVersion(startTopVer);
+                desc.localStartVersion(startTopVer);
             }
         }
 
@@ -545,9 +538,11 @@ class ClusterCachesInfo {
                 locJoinStartCaches = new ArrayList<>();
 
                 if (!disconnectedState() && joinDiscoData != null) {
-                    processJoiningNode(joinDiscoData, node.id(), topVer);
+                    processJoiningNode(joinDiscoData, node.id());
 
                     for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                        desc.localStartVersion(topVer);
+
                         CacheConfiguration cfg = desc.cacheConfiguration();
 
                         CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
@@ -564,8 +559,7 @@ class ClusterCachesInfo {
                                 desc.deploymentId(),
                                 desc.schema());
 
-                            desc0.startTopologyVersion(desc.startTopologyVersion());
-                            desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
+                            desc0.localStartVersion(desc.localStartVersion());
                             desc0.receivedFrom(desc.receivedFrom());
                             desc0.staticallyConfigured(desc.staticallyConfigured());
 
@@ -577,11 +571,15 @@ class ClusterCachesInfo {
                     }
                 }
             }
-            else {
-                CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
 
-                if (discoData != null)
-                    processJoiningNode(discoData, node.id(), topVer);
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.localStartVersion(topVer);
+            }
+
+            for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.localStartVersion(topVer);
             }
         }
     }
@@ -607,7 +605,6 @@ class ClusterCachesInfo {
             CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                 desc.cacheId(),
                 desc.cacheType(),
-                desc.startTopologyVersion(),
                 desc.deploymentId(),
                 desc.schema(),
                 desc.receivedFrom(),
@@ -624,7 +621,6 @@ class ClusterCachesInfo {
             CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                 0,
                 desc.cacheType(),
-                desc.startTopologyVersion(),
                 desc.deploymentId(),
                 desc.schema(),
                 desc.receivedFrom(),
@@ -659,7 +655,6 @@ class ClusterCachesInfo {
                 cacheData.deploymentId(),
                 cacheData.schema());
 
-            desc.startTopologyVersion(cacheData.startTopologyVersion());
             desc.receivedFrom(cacheData.receivedFrom());
             desc.staticallyConfigured(cacheData.staticallyConfigured());
 
@@ -679,7 +674,6 @@ class ClusterCachesInfo {
                 cacheData.deploymentId(),
                 cacheData.schema());
 
-            desc.startTopologyVersion(cacheData.startTopologyVersion());
             desc.receivedFrom(cacheData.receivedFrom());
             desc.staticallyConfigured(cacheData.staticallyConfigured());
 
@@ -723,12 +717,8 @@ class ClusterCachesInfo {
                 else
                     processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
             }
-            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
-                CacheJoinNodeDiscoveryData old =
-                    joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
-
-                assert old == null : old;
-            }
+            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
         }
     }
 
@@ -754,9 +744,8 @@ class ClusterCachesInfo {
     /**
      * @param joinData Joined node discovery data.
      * @param nodeId Joined node ID.
-     * @param topVer Topology version.
      */
-    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) {
+    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
         for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
             CacheConfiguration cfg = cacheInfo.config();
 
@@ -770,7 +759,6 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
-                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
 
@@ -791,7 +779,6 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
-                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index fe859f8..130ebde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -63,9 +63,6 @@ public class DynamicCacheDescriptor {
     private boolean updatesAllowed = true;
 
     /** */
-    private AffinityTopologyVersion startTopVer;
-
-    /** */
     private Integer cacheId;
 
     /** */
@@ -78,7 +75,7 @@ public class DynamicCacheDescriptor {
     private volatile CacheObjectContext objCtx;
 
     /** */
-    private transient AffinityTopologyVersion clientCacheStartVer;
+    private volatile transient AffinityTopologyVersion locStartVer;
 
     /** Mutex to control schema. */
     private final Object schemaMux = new Object();
@@ -131,22 +128,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @return Start topology version.
-     */
-    @Nullable public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
-    /**
-     * @param startTopVer Start topology version.
-     */
-    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
-        assert startTopVer != null;
-
-        this.startTopVer = startTopVer;
-    }
-
-    /**
      * @return {@code True} if this is template configuration.
      */
     public boolean template() {
@@ -253,15 +234,15 @@ public class DynamicCacheDescriptor {
     /**
      * @return Version when client cache on local node was started.
      */
-    @Nullable AffinityTopologyVersion clientCacheStartVersion() {
-        return clientCacheStartVer;
+    @Nullable AffinityTopologyVersion localStartVersion() {
+        return locStartVer;
     }
 
     /**
-     * @param clientCacheStartVer Version when client cache on local node was started.
+     * @param locStartVer Version when cache on local node was started.
      */
-    public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
-        this.clientCacheStartVer = clientCacheStartVer;
+    public void localStartVersion(AffinityTopologyVersion locStartVer) {
+        this.locStartVer = locStartVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 8f0d842..2466a59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -237,9 +237,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** Topology version when cache was started on local node. */
     private AffinityTopologyVersion locStartTopVer;
 
-    /** Global cache start topology version. */
-    private AffinityTopologyVersion cacheStartTopVer;
-
     /** Dynamic cache deployment ID. */
     private IgniteUuid dynamicDeploymentId;
 
@@ -292,7 +289,6 @@ public class GridCacheContext<K, V> implements Externalizable {
         GridCacheSharedContext sharedCtx,
         CacheConfiguration cacheCfg,
         CacheType cacheType,
-        AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion locStartTopVer,
         boolean affNode,
         boolean updatesAllowed,
@@ -321,7 +317,6 @@ public class GridCacheContext<K, V> implements Externalizable {
         assert ctx != null;
         assert sharedCtx != null;
         assert cacheCfg != null;
-        assert cacheStartTopVer != null : cacheCfg.getName();
         assert locStartTopVer != null : cacheCfg.getName();
 
         assert evtMgr != null;
@@ -341,7 +336,6 @@ public class GridCacheContext<K, V> implements Externalizable {
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
         this.locStartTopVer = locStartTopVer;
-        this.cacheStartTopVer = cacheStartTopVer;
         this.affNode = affNode;
         this.updatesAllowed = updatesAllowed;
         this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -470,15 +464,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
-     * @return Cache start topology version.
-     */
-    public AffinityTopologyVersion cacheStartTopologyVersion() {
-        assert cacheStartTopVer != null : name();
-
-        return cacheStartTopVer;
-    }
-
-    /**
      * @return Cache default {@link ExpiryPolicy}.
      */
     @Nullable public ExpiryPolicy expiry() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5e7e401..348d9d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,8 +146,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
                     assert cacheMsg.topologyVersion() != null : cacheMsg;
 
-                    AffinityTopologyVersion waitVer =
-                        ((GridDhtAffinityAssignmentRequest)cacheMsg).waitTopologyVersion();
+                    AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId());
+
+                    if (waitVer == null)
+                        waitVer = new AffinityTopologyVersion(cctx.localNode().order());
 
                     // Need to wait for exchange to avoid race between cache start and affinity request.
                     fut = cctx.exchange().affinityReadyFuture(waitVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8f52ae6..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -895,7 +895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     boolean ready;
 
                     if (exchId != null) {
-                        AffinityTopologyVersion startTopVer = cacheCtx.cacheStartTopologyVersion();
+                        AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
 
                         ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
                     }
@@ -1301,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
                     if (cacheCtx != null &&
-                        cacheCtx.cacheStartTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+                        cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
                         continue;
 
                     GridDhtPartitionTopology top = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 82db451..f9b015d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1345,7 +1345,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
         @Nullable CachePluginManager pluginMgr,
         CacheType cacheType,
-        AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion locStartTopVer,
         CacheObjectContext cacheObjCtx,
         boolean affNode,
@@ -1420,7 +1419,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             sharedCtx,
             cfg,
             cacheType,
-            cacheStartTopVer,
             locStartTopVer,
             affNode,
             updatesAllowed,
@@ -1553,7 +1551,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 sharedCtx,
                 cfg,
                 cacheType,
-                cacheStartTopVer,
                 locStartTopVer,
                 affNode,
                 true,
@@ -1733,7 +1730,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             nearCfg,
             cacheDesc.cacheType(),
             cacheDesc.deploymentId(),
-            cacheDesc.startTopologyVersion(),
             exchTopVer,
             cacheDesc.schema()
         );
@@ -1755,7 +1751,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     t.get2(),
                     desc.cacheType(),
                     desc.deploymentId(),
-                    desc.startTopologyVersion(),
                     exchTopVer,
                     desc.schema()
                 );
@@ -1785,7 +1780,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         null,
                         desc.cacheType(),
                         desc.deploymentId(),
-                        desc.startTopologyVersion(),
                         exchTopVer,
                         desc.schema()
                     );
@@ -1801,7 +1795,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param reqNearCfg Near configuration if specified for client cache start request.
      * @param cacheType Cache type.
      * @param deploymentId Deployment ID.
-     * @param cacheStartTopVer Cache start topology version.
      * @param exchTopVer Current exchange version.
      * @param schema Query schema.
      * @throws IgniteCheckedException If failed.
@@ -1811,7 +1804,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         @Nullable NearCacheConfiguration reqNearCfg,
         CacheType cacheType,
         IgniteUuid deploymentId,
-        AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion exchTopVer,
         @Nullable QuerySchema schema
     ) throws IgniteCheckedException {
@@ -1839,7 +1831,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         GridCacheContext cacheCtx = createCache(ccfg,
             null,
             cacheType,
-            cacheStartTopVer,
             exchTopVer,
             cacheObjCtx,
             affNode,

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 0b3080e..f80adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -31,12 +31,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private long futId;
+
     /** Topology version being queried. */
     private AffinityTopologyVersion topVer;
 
-    /** */
-    private AffinityTopologyVersion waitTopVer;
-
     /**
      * Empty constructor.
      */
@@ -45,26 +45,26 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
     }
 
     /**
+     * @param futId Future ID.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
-     * @param waitTopVer Topology version to wait for before message processing.
      */
-    public GridDhtAffinityAssignmentRequest(int cacheId,
-        AffinityTopologyVersion topVer,
-        AffinityTopologyVersion waitTopVer) {
+    public GridDhtAffinityAssignmentRequest(
+        long futId,
+        int cacheId,
+        AffinityTopologyVersion topVer) {
         assert topVer != null;
-        assert waitTopVer != null;
 
+        this.futId = futId;
         this.cacheId = cacheId;
         this.topVer = topVer;
-        this.waitTopVer = waitTopVer;
     }
 
     /**
-     * @return Topology version to wait for before message processing.
+     * @return Future ID.
      */
-    public AffinityTopologyVersion waitTopologyVersion() {
-        return waitTopVer;
+    public long futureId() {
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -110,13 +110,13 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("waitTopVer", waitTopVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
@@ -138,7 +138,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
-                topVer = reader.readMessage("topVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -146,7 +146,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                waitTopVer = reader.readMessage("waitTopVer");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private long futId;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     }
 
     /**
+     * @param futId Future ID.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
      */
-    public GridDhtAffinityAssignmentResponse(int cacheId,
+    public GridDhtAffinityAssignmentResponse(
+        long futId,
+        int cacheId,
         @NotNull AffinityTopologyVersion topVer,
         List<List<ClusterNode>> affAssignment) {
+        this.futId = futId;
         this.cacheId = cacheId;
         this.topVer = topVer;
 
         affAssignmentIds = ids(affAssignment);
     }
 
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean partitionExchangeMessage() {
         return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 5:
+                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 5:
+                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 1d6563e..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -36,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private static IgniteLogger log;
 
     /** */
+    private static final AtomicLong idGen = new AtomicLong();
+
+    /** */
     private final GridCacheSharedContext ctx;
 
     /** List of available nodes this future can fetch data from. */
@@ -65,11 +68,13 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private ClusterNode pendingNode;
 
     /** */
-    @GridToStringInclude
-    private final T2<Integer, AffinityTopologyVersion> key;
+    private final long id;
 
     /** */
-    private final DynamicCacheDescriptor cacheDesc;
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final int cacheId;
 
     /**
      * @param ctx Context.
@@ -83,9 +88,11 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
+        this.topVer = topVer;
+        this.cacheId = cacheDesc.cacheId();
         this.ctx = ctx;
-        this.cacheDesc = cacheDesc;
-        this.key = new T2<>(cacheDesc.cacheId(), topVer);
+
+        id = idGen.getAndIncrement();
 
         Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
 
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     }
 
     /**
-     * Initializes fetch future.
+     * @return Cache ID.
      */
-    public void init() {
-        ctx.affinity().addDhtAssignmentFetchFuture(this);
+    public int cacheId() {
+        return cacheId;
+    }
 
-        requestFromNextNode();
+    /**
+     * @return Future ID.
+     */
+    public long id() {
+        return id;
     }
 
     /**
-     * @return Future key.
+     * Initializes fetch future.
      */
-    public T2<Integer, AffinityTopologyVersion> key() {
-        return key;
+    public void init() {
+        ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+        requestFromNextNode();
     }
 
     /**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
      * @param res Response.
      */
     public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-        if (!res.topologyVersion().equals(key.get2())) {
-            if (log.isDebugEnabled())
-                log.debug("Received affinity assignment for wrong topology version (will ignore) " +
-                    "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
-            return;
-        }
-
         GridDhtAffinityAssignmentResponse res0 = null;
 
         synchronized (this) {
@@ -189,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                             ", node=" + node + ']');
 
                     ctx.io().send(node,
-                        new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), cacheDesc.startTopologyVersion()),
+                        new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5b3dfc6..58ad600 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         int num = cctx.affinity().partitions();
 
         if (cctx.rebalanceEnabled()) {
-            boolean added = exchId.topologyVersion().equals(cctx.cacheStartTopologyVersion());
+            boolean added = exchId.topologyVersion().equals(cctx.startTopologyVersion());
 
             boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
 
@@ -1157,7 +1157,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // then we keep the newer value.
                     if (newPart != null &&
                         (newPart.updateSequence() < part.updateSequence() ||
-                        (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+                        (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                 AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
-                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+                    req.futureId(),
+                    cctx.cacheId(),
                     topVer,
                     assignment.assignment());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9fe29ef..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -345,7 +345,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
             assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
                 ", locStart=" + cctx.startTopologyVersion() +
-                ", cacheStart=" + cctx.cacheStartTopologyVersion() +
                 ", locNode=" + cctx.localNode() +
                 ", stopping=" + cctx.kernalContext().isStopping();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 6149586..4f0d9a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -80,7 +80,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             defaultCacheConfiguration(),
             CacheType.USER,
             AffinityTopologyVersion.ZERO,
-            AffinityTopologyVersion.ZERO,
             true,
             true,
             null,