You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/11 14:42:30 UTC

[1/7] ignite git commit: ignite-5075

Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 038c00f49 -> aee103166


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7424dc1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7424dc1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7424dc1

Branch: refs/heads/ignite-5075
Commit: b7424dc1f6d32e67478015cf83324c30bcc39781
Parents: 038c00f
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 13:46:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 13:46:44 2017 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/IgniteCacheOffheapManagerImpl.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7424dc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 368b86c..cfbec12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -84,10 +84,10 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
 @SuppressWarnings("PublicInnerClass")
 public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
     /** */
-    private GridCacheSharedContext ctx;
+    protected GridCacheSharedContext ctx;
 
     /** */
-    private CacheGroupInfrastructure grp;
+    protected CacheGroupInfrastructure grp;
 
     /** */
     private IgniteLogger log;


[5/7] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5c86708
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5c86708
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5c86708

Branch: refs/heads/ignite-5075
Commit: b5c86708c52d2efc85ee87ff67bde452a98ce2c6
Parents: b7424dc d24b08b
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 17:02:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 17:02:41 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 31 +++++++-----
 .../internal/processors/cache/CacheData.java    | 14 ------
 .../processors/cache/ClusterCachesInfo.java     | 47 +++++++-----------
 .../cache/DynamicCacheDescriptor.java           | 31 +++---------
 .../processors/cache/GridCacheContext.java      | 15 ------
 .../processors/cache/GridCacheIoManager.java    |  6 ++-
 .../GridCachePartitionExchangeManager.java      |  4 +-
 .../processors/cache/GridCacheProcessor.java    |  9 ----
 .../dht/GridDhtAffinityAssignmentRequest.java   | 32 ++++++-------
 .../dht/GridDhtAffinityAssignmentResponse.java  | 36 ++++++++++++--
 .../dht/GridDhtAssignmentFetchFuture.java       | 50 +++++++++++---------
 .../dht/GridDhtPartitionTopologyImpl.java       |  4 +-
 .../continuous/CacheContinuousQueryManager.java |  1 -
 .../loadtests/hashmap/GridCacheTestContext.java |  1 -
 14 files changed, 127 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index bd80bf0,bd41ccc..0443ba4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@@ -991,9 -1006,9 +998,9 @@@ public class CacheAffinitySharedManager
          for (int i = 0; i < fetchFuts.size(); i++) {
              GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
  
-             Integer grpId = fetchFut.key().get1();
 -            Integer cacheId = fetchFut.cacheId();
++            Integer grpId = fetchFut.cacheId();
  
 -            fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
 +            fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index fcad88a,0c97ab0..0927e25
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@@ -79,9 -72,7 +75,8 @@@ public class CacheData implements Seria
       */
      CacheData(CacheConfiguration cacheCfg,
          int cacheId,
 +        int grpId,
          CacheType cacheType,
-         AffinityTopologyVersion startTopVer,
          IgniteUuid deploymentId,
          QuerySchema schema,
          UUID rcvdFrom,
@@@ -90,16 -81,12 +85,14 @@@
          byte flags) {
          assert cacheCfg != null;
          assert rcvdFrom != null : cacheCfg.getName();
-         assert startTopVer != null : cacheCfg.getName();
          assert deploymentId != null : cacheCfg.getName();
          assert template || cacheId != 0 : cacheCfg.getName();
 +        assert template || grpId != 0 : cacheCfg.getName();
  
          this.cacheCfg = cacheCfg;
          this.cacheId = cacheId;
 +        this.grpId = grpId;
          this.cacheType = cacheType;
-         this.startTopVer = startTopVer;
          this.deploymentId = deploymentId;
          this.schema = schema;
          this.rcvdFrom = rcvdFrom;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 9789fdf,efcf6a8..7800a45
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@@ -637,9 -604,7 +633,8 @@@ class ClusterCachesInfo 
          for (DynamicCacheDescriptor desc : registeredCaches.values()) {
              CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                  desc.cacheId(),
 +                desc.groupDescriptor().groupId(),
                  desc.cacheType(),
-                 desc.startTopologyVersion(),
                  desc.deploymentId(),
                  desc.schema(),
                  desc.receivedFrom(),
@@@ -668,9 -620,7 +663,8 @@@
          for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
              CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                  0,
 +                0,
                  desc.cacheType(),
-                 desc.startTopologyVersion(),
                  desc.deploymentId(),
                  desc.schema(),
                  desc.receivedFrom(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 94f86aa,2466a59..d5128f5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@@ -280,9 -288,7 +277,8 @@@ public class GridCacheContext<K, V> imp
          GridKernalContext ctx,
          GridCacheSharedContext sharedCtx,
          CacheConfiguration cacheCfg,
 +        CacheGroupInfrastructure grp,
          CacheType cacheType,
-         AffinityTopologyVersion cacheStartTopVer,
          AffinityTopologyVersion locStartTopVer,
          boolean affNode,
          boolean updatesAllowed,
@@@ -308,10 -317,8 +304,9 @@@
          assert ctx != null;
          assert sharedCtx != null;
          assert cacheCfg != null;
-         assert cacheStartTopVer != null : cacheCfg.getName();
          assert locStartTopVer != null : cacheCfg.getName();
  
 +        assert grp != null;
          assert evtMgr != null;
          assert storeMgr != null;
          assert evictMgr != null;
@@@ -327,10 -334,8 +322,9 @@@
          this.ctx = ctx;
          this.sharedCtx = sharedCtx;
          this.cacheCfg = cacheCfg;
 +        this.grp = grp;
          this.cacheType = cacheType;
          this.locStartTopVer = locStartTopVer;
-         this.cacheStartTopVer = cacheStartTopVer;
          this.affNode = affNode;
          this.updatesAllowed = updatesAllowed;
          this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e426426,04c647f..98ad758
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -897,35 -889,37 +897,35 @@@ public class GridCachePartitionExchange
  
          final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
  
 -        cctx.forAllCaches(new IgniteInClosure<GridCacheContext>() {
 -            @Override public void apply(GridCacheContext cacheCtx) {
 -                if (!cacheCtx.isLocal()) {
 -                    boolean ready;
 +        for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
 +            if (!grp.isLocal()) {
 +                boolean ready;
  
 -                    if (exchId != null) {
 -                        AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
 +                if (exchId != null) {
-                     AffinityTopologyVersion startTopVer = grp.groupStartVersion();
++                    AffinityTopologyVersion startTopVer = grp.localStartVersion();
  
 -                        ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
 -                    }
 -                    else
 -                        ready = cacheCtx.started();
 +                    ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
 +                }
 +                else
 +                    ready = grp.started();
  
 -                    if (ready) {
 -                        GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache();
 +                if (ready) {
 +                    GridAffinityAssignmentCache affCache = grp.affinity();
  
 -                        GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
 +                    GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
  
 -                        addFullPartitionsMap(m,
 -                            dupData,
 -                            compress,
 -                            cacheCtx.cacheId(),
 -                            locMap,
 -                            affCache.similarAffinityKey());
 +                    addFullPartitionsMap(m,
 +                        dupData,
 +                        compress,
 +                        grp.groupId(),
 +                        locMap,
 +                        affCache.similarAffinityKey());
  
 -                        if (exchId != null)
 -                            m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
 -                    }
 +                    if (exchId != null)
 +                        m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
                  }
              }
 -        });
 +        }
  
          // It is important that client topologies be added after contexts.
          for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
@@@ -1299,12 -1296,12 +1299,12 @@@
                  boolean updated = false;
  
                  for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
 -                    Integer cacheId = entry.getKey();
 +                    Integer grpId = entry.getKey();
  
 -                    GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 +                    CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
  
 -                    if (cacheCtx != null &&
 -                        cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
 +                    if (grp != null &&
-                         grp.groupStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
++                        grp.localStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
                          continue;
  
                      GridDhtPartitionTopology top = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 86ead93,f9b015d..a72f21c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -1363,10 -1343,8 +1363,9 @@@ public class GridCacheProcessor extend
       * @throws IgniteCheckedException If failed to create cache.
       */
      private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
 +        CacheGroupInfrastructure grp,
          @Nullable CachePluginManager pluginMgr,
          CacheType cacheType,
-         AffinityTopologyVersion cacheStartTopVer,
          AffinityTopologyVersion locStartTopVer,
          CacheObjectContext cacheObjCtx,
          boolean affNode,
@@@ -1435,9 -1418,7 +1434,8 @@@
              ctx,
              sharedCtx,
              cfg,
 +            grp,
              cacheType,
-             cacheStartTopVer,
              locStartTopVer,
              affNode,
              updatesAllowed,
@@@ -1566,9 -1550,7 +1564,8 @@@
                  ctx,
                  sharedCtx,
                  cfg,
 +                grp,
                  cacheType,
-                 cacheStartTopVer,
                  locStartTopVer,
                  affNode,
                  true,
@@@ -1852,40 -1828,9 +1844,39 @@@
              ccfg.setNearConfiguration(reqNearCfg);
          }
  
 +        String grpName = startCfg.getGroupName();
 +
 +        CacheGroupInfrastructure grp = null;
 +
 +        if (grpName != null) {
 +            for (CacheGroupInfrastructure grp0 : cacheGrps.values()) {
 +                if (grp0.sharedGroup() && grpName.equals(grp0.name())) {
 +                    grp = grp0;
 +
 +                    break;
 +                }
 +            }
 +
 +            if (grp == null) {
 +                grp = startCacheGroup(grpDesc,
 +                    cacheType,
 +                    affNode,
 +                    cacheObjCtx,
 +                    exchTopVer);
 +            }
 +        }
 +        else {
 +            grp = startCacheGroup(grpDesc,
 +                cacheType,
 +                affNode,
 +                cacheObjCtx,
 +                exchTopVer);
 +        }
 +
          GridCacheContext cacheCtx = createCache(ccfg,
 +            grp,
              null,
              cacheType,
-             cacheStartTopVer,
              exchTopVer,
              cacheObjCtx,
              affNode,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 14eb92f,f80adc5..d9d642a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@@ -45,19 -45,19 +45,19 @@@ public class GridDhtAffinityAssignmentR
      }
  
      /**
+      * @param futId Future ID.
 -     * @param cacheId Cache ID.
 +     * @param grpId Cache group ID.
       * @param topVer Topology version.
-      * @param waitTopVer Topology version to wait for before message processing.
       */
-     public GridDhtAffinityAssignmentRequest(int grpId,
-         AffinityTopologyVersion topVer,
-         AffinityTopologyVersion waitTopVer) {
+     public GridDhtAffinityAssignmentRequest(
+         long futId,
 -        int cacheId,
++        int grpId,
+         AffinityTopologyVersion topVer) {
          assert topVer != null;
-         assert waitTopVer != null;
  
+         this.futId = futId;
 -        this.cacheId = cacheId;
 +        this.grpId = grpId;
          this.topVer = topVer;
-         this.waitTopVer = waitTopVer;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 6c01c8d,5d82171..4df3fc1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@@ -67,14 -72,18 +70,18 @@@ public class GridDhtAffinityAssignmentR
      }
  
      /**
+      * @param futId Future ID.
 -     * @param cacheId Cache ID.
 +     * @param grpId Cache group ID.
       * @param topVer Topology version.
       * @param affAssignment Affinity assignment.
       */
-     public GridDhtAffinityAssignmentResponse(int grpId,
+     public GridDhtAffinityAssignmentResponse(
+         long futId,
 -        int cacheId,
++        int grpId,
          @NotNull AffinityTopologyVersion topVer,
          List<List<ClusterNode>> affAssignment) {
+         this.futId = futId;
 -        this.cacheId = cacheId;
 +        this.grpId = grpId;
          this.topVer = topVer;
  
          affAssignmentIds = ids(affAssignment);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index c8966ed,741ca5e..c008ef3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@@ -66,11 -68,13 +69,13 @@@ public class GridDhtAssignmentFetchFutu
      private ClusterNode pendingNode;
  
      /** */
-     @GridToStringInclude
-     private final T2<Integer, AffinityTopologyVersion> key;
+     private final long id;
  
      /** */
-     private final CacheGroupDescriptor grpDesc;
+     private final AffinityTopologyVersion topVer;
+ 
+     /** */
 -    private final int cacheId;
++    private final int grpId;
  
      /**
       * @param ctx Context.
@@@ -84,11 -88,13 +89,13 @@@
          AffinityTopologyVersion topVer,
          DiscoCache discoCache
      ) {
+         this.topVer = topVer;
 -        this.cacheId = cacheDesc.cacheId();
++        this.grpId = grpDesc.groupId();
          this.ctx = ctx;
-         this.grpDesc = grpDesc;
-         this.key = new T2<>(grpDesc.groupId(), topVer);
+ 
+         id = idGen.getAndIncrement();
  
 -        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
 +        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId());
  
          LinkedList<ClusterNode> tmp = new LinkedList<>();
  
@@@ -190,7 -195,7 +196,7 @@@
                              ", node=" + node + ']');
  
                      ctx.io().send(node,
-                         new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), grpDesc.startTopologyVersion()),
 -                        new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
++                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer),
                          AFFINITY_POOL);
  
                      // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index b76f548,58ad600..8ea9e46
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -396,12 -388,12 +396,12 @@@ public class GridDhtPartitionTopologyIm
                  ", futVer=" + exchFut.topologyVersion() +
                  ", fut=" + exchFut + ']';
  
 -        List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion());
 +        List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
  
 -        int num = cctx.affinity().partitions();
 +        int num = grp.affinity().partitions();
  
 -        if (cctx.rebalanceEnabled()) {
 -            boolean added = exchId.topologyVersion().equals(cctx.startTopologyVersion());
 +        if (grp.rebalanceEnabled()) {
-             boolean added = exchId.topologyVersion().equals(grp.groupStartVersion());
++            boolean added = exchId.topologyVersion().equals(grp.localStartVersion());
  
              boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
  
@@@ -1173,7 -1157,7 +1173,7 @@@
                      // then we keep the newer value.
                      if (newPart != null &&
                          (newPart.updateSequence() < part.updateSequence() ||
-                         (grp.groupStartVersion().compareTo(newPart.topologyVersion()) > 0))
 -                        (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
++                        (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
                          ) {
                          if (log.isDebugEnabled())
                              log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 5a7dbc1,4f0d9a1..9126fd2
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@@ -78,12 -78,13 +78,11 @@@ public class GridCacheTestContext<K, V
                  null
              ),
              defaultCacheConfiguration(),
 +            null,
              CacheType.USER,
              AffinityTopologyVersion.ZERO,
-             AffinityTopologyVersion.ZERO,
              true,
              true,
 -            null,
 -            null,
 -            null,
              new GridCacheEventManager(),
              new CacheOsStoreManager(null, new CacheConfiguration()),
              new GridCacheEvictionManager(),


[6/7] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4afeba75
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4afeba75
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4afeba75

Branch: refs/heads/ignite-5075
Commit: 4afeba7573b7ad58b0f25a1b7c6511232c30295e
Parents: b5c8670
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 17:09:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 17:19:24 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 34 ++++----------------
 .../processors/cache/CacheGroupData.java        | 23 ++++++-------
 .../processors/cache/CacheGroupDescriptor.java  | 23 ++++++-------
 .../cache/CacheGroupInfrastructure.java         | 13 ++------
 .../processors/cache/ClusterCachesInfo.java     | 18 +++++------
 .../processors/cache/ExchangeActions.java       | 11 +++++++
 .../processors/cache/GridCacheIoManager.java    |  3 +-
 .../processors/cache/GridCacheProcessor.java    |  1 -
 .../dht/GridDhtAssignmentFetchFuture.java       |  6 ++--
 .../dht/GridDhtPartitionTopologyImpl.java       |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 22 +++++++------
 .../dht/preloader/GridDhtPreloader.java         |  3 --
 .../processors/cache/IgniteCacheGroupsTest.java |  2 ++
 13 files changed, 69 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0443ba4..dc31ae9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -118,16 +118,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param cacheId Cache ID.
-     * @return Cache start topology version.
-     */
-    public AffinityTopologyVersion localStartVersion(int cacheId) {
-        DynamicCacheDescriptor desc = registeredCaches.get(cacheId);
-
-        return desc != null ? desc.localStartVersion() : null;
-    }
-
-    /**
      * Callback invoked from discovery thread when discovery message is received.
      *
      * @param type Event type.
@@ -403,21 +393,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
-            if (grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
-                if (grp.groupStartVersion().equals(fut.topologyVersion())) {
-                    GridAffinityAssignmentCache aff = grp.affinity();
-
-                    List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                        fut.discoveryEvent(), fut.discoCache());
-
-                    aff.initialize(fut.topologyVersion(), assignment);
-                }
-                else {
-                    assert grp.localStartVersion().equals(fut.topologyVersion());
-
-                    initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, lateAffAssign);
-                }
-            }
+            if (grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE))
+                initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, lateAffAssign);
         }
 
         if (crd) {
@@ -881,7 +858,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert grpDesc != null : aff.groupName();
 
-        return grpDesc.startTopologyVersion().equals(fut.topologyVersion()) ||
+        return fut.cacheGroupStarting(aff.groupId()) ||
+            cctx.localNodeId().equals(grpDesc.receivedFrom()) ||
             !fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
             (affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
     }
@@ -974,7 +952,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (grp.isLocal())
                 continue;
 
-            if (grp.groupStartVersion().equals(fut.topologyVersion())) {
+            if (canCalculateAffinity(grp.affinity(), fut)) {
                 List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(),
                     fut.discoveryEvent(),
                     fut.discoCache());
@@ -998,7 +976,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         for (int i = 0; i < fetchFuts.size(); i++) {
             GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
 
-            Integer grpId = fetchFut.cacheId();
+            Integer grpId = fetchFut.groupId();
 
             fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
index 0123262..ea2c256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -19,9 +19,8 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
 import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -40,13 +39,13 @@ public class CacheGroupData implements Serializable {
     private final int grpId;
 
     /** */
-    private final IgniteUuid deploymentId;
+    private final UUID rcvdFrom;
 
     /** */
-    private final CacheConfiguration cacheCfg;
+    private final IgniteUuid deploymentId;
 
     /** */
-    private final AffinityTopologyVersion startTopVer;
+    private final CacheConfiguration cacheCfg;
 
     /** */
     @GridToStringInclude
@@ -55,28 +54,30 @@ public class CacheGroupData implements Serializable {
     /**
      * @param cacheCfg Cache configuration.
      * @param grpId
-     * @param startTopVer
      */
     public CacheGroupData(CacheConfiguration cacheCfg,
         String grpName,
         int grpId,
+        UUID rcvdFrom,
         IgniteUuid deploymentId,
-        AffinityTopologyVersion startTopVer,
         Map<String, Integer> caches) {
         assert cacheCfg != null;
         assert grpName != null;
         assert grpId != 0;
         assert deploymentId != null;
-        assert startTopVer != null;
 
         this.cacheCfg = cacheCfg;
         this.grpName = grpName;
         this.grpId = grpId;
+        this.rcvdFrom = rcvdFrom;
         this.deploymentId = deploymentId;
-        this.startTopVer = startTopVer;
         this.caches = caches;
     }
 
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
     public String groupName() {
         return grpName;
     }
@@ -93,10 +94,6 @@ public class CacheGroupData implements Serializable {
         return cacheCfg;
     }
 
-    public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
     Map<String, Integer> caches() {
         return caches;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
index da55871..c418002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
@@ -19,13 +19,11 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -44,31 +42,34 @@ public class CacheGroupDescriptor {
     private final CacheConfiguration cacheCfg;
 
     /** */
-    private final AffinityTopologyVersion startTopVer;
-
-    /** */
     @GridToStringInclude
     private Map<String, Integer> caches;
 
+    /** */
+    private final UUID rcvdFrom;
+
     CacheGroupDescriptor(String grpName,
         int grpId,
+        UUID rcvdFrom,
         IgniteUuid deploymentId,
         CacheConfiguration cacheCfg,
-        AffinityTopologyVersion startTopVer,
         Map<String, Integer> caches) {
         assert cacheCfg != null;
         assert grpName != null;
         assert grpId != 0;
-        assert startTopVer != null;
 
         this.grpName = grpName;
         this.grpId = grpId;
+        this.rcvdFrom = rcvdFrom;
         this.deploymentId = deploymentId;
         this.cacheCfg = cacheCfg;
-        this.startTopVer = startTopVer;
         this.caches = caches;
     }
 
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
     public IgniteUuid deploymentId() {
         return deploymentId;
     }
@@ -113,10 +114,6 @@ public class CacheGroupDescriptor {
         return cacheCfg;
     }
 
-    public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
     Map<String, Integer> caches() {
         return caches;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index e1e3e93..2a4e918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -70,9 +70,6 @@ public class CacheGroupInfrastructure {
     private GridDhtPartitionTopologyImpl top;
 
     /** */
-    private final AffinityTopologyVersion grpStartVer;
-
-    /** */
     private final AffinityTopologyVersion locStartVer;
 
     /** */
@@ -122,7 +119,6 @@ public class CacheGroupInfrastructure {
         CacheObjectContext cacheObjCtx,
         FreeList freeList,
         ReuseList reuseList,
-        AffinityTopologyVersion grpStartVer,
         AffinityTopologyVersion locStartVer) {
         assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';
         assert ccfg != null;
@@ -136,7 +132,6 @@ public class CacheGroupInfrastructure {
         this.cacheObjCtx = cacheObjCtx;
         this.freeList = freeList;
         this.reuseList = reuseList;
-        this.grpStartVer = grpStartVer;
         this.locStartVer = locStartVer;
 
         ioPlc = cacheType.ioPolicy();
@@ -257,10 +252,6 @@ public class CacheGroupInfrastructure {
         return false;
     }
 
-    public AffinityTopologyVersion groupStartVersion() {
-        return grpStartVer;
-    }
-
     public AffinityTopologyVersion localStartVersion() {
         return locStartVer;
     }
@@ -383,7 +374,9 @@ public class CacheGroupInfrastructure {
 
         AffinityAssignment assignment = aff.cachedAffinity(topVer);
 
-        GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(grpId,
+        GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+            req.futureId(),
+            grpId,
             topVer,
             assignment.assignment());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 7800a45..8b23e5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -290,8 +290,8 @@ class ClusterCachesInfo {
                         CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions,
                             ccfg,
                             cacheId,
-                            req.deploymentId(),
-                            topVer.nextMinorVersion());
+                            req.initiatingNodeId(),
+                            req.deploymentId());
 
                         DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
                             ccfg,
@@ -651,8 +651,8 @@ class ClusterCachesInfo {
             CacheGroupData grpData = new CacheGroupData(grpDesc.config(),
                 grpDesc.groupName(),
                 grpDesc.groupId(),
+                grpDesc.receivedFrom(),
                 grpDesc.deploymentId(),
-                grpDesc.startTopologyVersion(),
                 grpDesc.caches());
 
             cacheGrps.put(grpDesc.groupName(), grpData);
@@ -699,9 +699,9 @@ class ClusterCachesInfo {
         for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
             CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(grpData.groupName(),
                 grpData.groupId(),
+                grpData.receivedFrom(),
                 grpData.deploymentId(),
                 grpData.config(),
-                grpData.startTopologyVersion(),
                 grpData.caches());
 
             CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupName(), grpDesc);
@@ -857,8 +857,8 @@ class ClusterCachesInfo {
                 CacheGroupDescriptor grpDesc = registerCacheGroup(null,
                     cfg,
                     cacheId,
-                    joinData.cacheDeploymentId(),
-                    topVer);
+                    nodeId,
+                    joinData.cacheDeploymentId());
 
                 DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
                     cfg,
@@ -897,8 +897,8 @@ class ClusterCachesInfo {
         ExchangeActions exchActions,
         CacheConfiguration startedCacheCfg,
         Integer cacheId,
-        IgniteUuid deploymentId,
-        AffinityTopologyVersion topVer) {
+        UUID rcvdFrom,
+        IgniteUuid deploymentId) {
         if (startedCacheCfg.getGroupName() != null) {
             CacheGroupDescriptor desc = registeredCacheGrps.get(startedCacheCfg.getGroupName());
 
@@ -919,9 +919,9 @@ class ClusterCachesInfo {
         CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
             grpName,
             grpId,
+            rcvdFrom,
             deploymentId,
             startedCacheCfg,
-            topVer,
             caches);
 
         CacheGroupDescriptor old = registeredCacheGrps.put(grpName, grpDesc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 8c9833f..977f544 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -296,6 +296,17 @@ public class ExchangeActions {
         return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupDescriptor>emptyList();
     }
 
+    public boolean cacheGroupStarting(int grpId) {
+        if (cacheGrpsToStart != null) {
+            for (CacheGroupDescriptor grpToStop : cacheGrpsToStart) {
+                if (grpToStop.groupId() == grpId)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
     void addCacheGroupToStop(CacheGroupDescriptor grpDesc) {
         if (cacheGrpsToStop == null)
             cacheGrpsToStop = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 02d1086..fb2ac3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,7 +146,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
                     assert cacheMsg.topologyVersion() != null : cacheMsg;
 
-                    AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId());
+                    // TODO IGNITE-5075.
+                    AffinityTopologyVersion waitVer = null;//cctx.affinity().localStartVersion(((GridDhtAffinityAssignmentRequest) cacheMsg).groupId());
 
                     if (waitVer == null)
                         waitVer = new AffinityTopologyVersion(cctx.localNode().order());

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a72f21c..63b29b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1922,7 +1922,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cacheObjCtx,
             freeList,
             reuseList,
-            desc.startTopologyVersion(),
             exchTopVer);
 
         grp.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index c008ef3..a06ff78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -113,10 +113,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     }
 
     /**
-     * @return Cache ID.
+     * @return Cache group ID.
      */
-    public int cacheId() {
-        return cacheId;
+    public int groupId() {
+        return grpId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 8ea9e46..c75f05c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -414,7 +414,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                         boolean owned = locPart.own();
 
-                        assert owned : "Failed to own partition for oldest node [grp" + grp.name() +
+                        assert owned : "Failed to own partition for oldest node [grp=" + grp.name() +
                             ", part=" + locPart + ']';
 
                         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c7457c3..d15f069 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -394,13 +394,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
     }
 
+    public boolean cacheGroupStarting(int grpId) {
+        return exchActions != null && exchActions.cacheGroupStarting(grpId);
+    }
+
     /**
      * @param cacheId Cache ID.
      * @return {@code True} if non-client cache was added during this exchange.
      */
     public boolean cacheStarted(int cacheId) {
         return exchActions != null && exchActions.cacheStarted(cacheId);
-
     }
 
     /**
@@ -943,20 +946,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private void warnNoAffinityNodes() {
         List<String> cachesWithoutNodes = null;
 
-        for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors()) {
-            if (cacheDesc.startTopologyVersion().compareTo(topologyVersion()) <= 0 &&
-                discoCache.cacheGroupAffinityNodes(cacheDesc.groupDescriptor().groupId()).isEmpty()) {
+        for (GridCacheContext ctx : cctx.cacheContexts()) {
+            if (discoCache.cacheGroupAffinityNodes(ctx.groupId()).isEmpty()) {
                 if (cachesWithoutNodes == null)
                     cachesWithoutNodes = new ArrayList<>();
 
-                cachesWithoutNodes.add(cacheDesc.cacheName());
+                cachesWithoutNodes.add(ctx.name());
 
                 // Fire event even if there is no client cache started.
-                if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
+                if (ctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
                     Event evt = new CacheEvent(
-                        cacheDesc.cacheName(),
-                        cctx.localNode(),
-                        cctx.localNode(),
+                        ctx.name(),
+                        ctx.localNode(),
+                        ctx.localNode(),
                         "All server nodes have left the cluster.",
                         EventType.EVT_CACHE_NODES_LEFT,
                         0,
@@ -973,7 +975,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                         null
                     );
 
-                    cctx.gridEvents().record(evt);
+                    ctx.gridEvents().record(evt);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index ec6e4af..3d62c2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -44,8 +44,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -66,7 +64,6 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index ba01844..ff20803 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -44,6 +44,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        //cfg.setLateAffinityAssignment(false);
+
         return cfg;
     }
 


[2/7] ignite git commit: ignite-5075

Posted by sb...@apache.org.
ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b59733a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b59733a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b59733a3

Branch: refs/heads/ignite-5075
Commit: b59733a391b412d94869418a4a30289042e7fc8d
Parents: b324c13
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 13:48:26 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 13:48:26 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/processors/cache/ClusterCachesInfo.java | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b59733a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index f74343b..1afedf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -768,7 +768,6 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
-                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
 
@@ -789,7 +788,6 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
-                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
 


[3/7] ignite git commit: ignite-5075

Posted by sb...@apache.org.
ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/861b34b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/861b34b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/861b34b2

Branch: refs/heads/ignite-5075
Commit: 861b34b29a9a32f63204e1fb73f43cdcb8b049fc
Parents: b59733a
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 14:40:00 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 14:40:00 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/ClusterCachesInfo.java     | 24 ++++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/861b34b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 1afedf1..44d41f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -82,6 +82,9 @@ class ClusterCachesInfo {
     private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
 
     /** */
+    private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
+
+    /** */
     private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
 
     /**
@@ -575,15 +578,10 @@ class ClusterCachesInfo {
                 }
             }
             else {
-                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                    if (desc.startTopologyVersion() == null && node.id().equals(desc.receivedFrom()))
-                        desc.startTopologyVersion(topVer);
-                }
+                CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
 
-                for (DynamicCacheDescriptor desc : registeredTemplates().values()) {
-                    if (desc.startTopologyVersion() == null && node.id().equals(desc.receivedFrom()))
-                        desc.startTopologyVersion(topVer);
-                }
+                if (discoData != null)
+                    processJoiningNode(discoData, node.id(), topVer);
             }
         }
     }
@@ -725,8 +723,12 @@ class ClusterCachesInfo {
                 else
                     processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
             }
-            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
-                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), null);
+            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
+                CacheJoinNodeDiscoveryData old =
+                    joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
+
+                assert old == null : old;
+            }
         }
     }
 
@@ -768,6 +770,7 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
+                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
 
@@ -788,6 +791,7 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
+                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
 


[7/7] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aee10316
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aee10316
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aee10316

Branch: refs/heads/ignite-5075
Commit: aee103166463a38494d73507e70ee1c55bb9f134
Parents: 4afeba7
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 17:42:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 17:42:22 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheAffinitySharedManager.java        | 2 +-
 .../internal/processors/cache/CacheGroupInfrastructure.java | 9 +++++++++
 .../internal/processors/cache/GridCacheProcessor.java       | 1 +
 .../cache/distributed/dht/GridDhtPartitionTopologyImpl.java | 5 +++--
 4 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index dc31ae9..f85d110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -952,7 +952,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (grp.isLocal())
                 continue;
 
-            if (canCalculateAffinity(grp.affinity(), fut)) {
+            if (fut.cacheGroupStarting(grp.groupId()) || cctx.localNodeId().equals(grp.receivedFrom())) {
                 List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(),
                     fut.discoveryEvent(),
                     fut.discoCache());

http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 2a4e918..04b45a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -105,6 +105,9 @@ public class CacheGroupInfrastructure {
     /** */
     private boolean storeCacheId;
 
+    /** */
+    private UUID rcvdFrom;
+
     /**
      * @param grpId Group ID.
      * @param ctx Context.
@@ -112,6 +115,7 @@ public class CacheGroupInfrastructure {
      */
     CacheGroupInfrastructure(GridCacheSharedContext ctx,
         int grpId,
+        UUID rcvdFrom,
         CacheType cacheType,
         CacheConfiguration ccfg,
         boolean affNode,
@@ -124,6 +128,7 @@ public class CacheGroupInfrastructure {
         assert ccfg != null;
 
         this.grpId = grpId;
+        this.rcvdFrom = rcvdFrom;
         this.cacheType = cacheType;
         this.ctx = ctx;
         this.ccfg = ccfg;
@@ -143,6 +148,10 @@ public class CacheGroupInfrastructure {
         log = ctx.kernalContext().log(getClass());
     }
 
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
     public boolean storeCacheId() {
         return storeCacheId;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 63b29b8..be3b4fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1915,6 +1915,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheGroupInfrastructure grp = new CacheGroupInfrastructure(sharedCtx,
             desc.groupId(),
+            desc.receivedFrom(),
             cacheType,
             cfg,
             affNode,

http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index c75f05c..9168d7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -401,9 +401,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         int num = grp.affinity().partitions();
 
         if (grp.rebalanceEnabled()) {
-            boolean added = exchId.topologyVersion().equals(grp.localStartVersion());
+            boolean added = exchFut.cacheGroupStarting(grp.groupId()) ||
+                (exchId.isJoined() && exchId.nodeId().equals(grp.receivedFrom()));
 
-            boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
+            boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined());
 
             if (first) {
                 assert exchId.isJoined() || added;


[4/7] ignite git commit: ignite-5075

Posted by sb...@apache.org.
ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d24b08b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d24b08b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d24b08b6

Branch: refs/heads/ignite-5075
Commit: d24b08b65aeee089894b62fa8619d0c24cbae23b
Parents: 861b34b
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 16:18:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 16:18:32 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 33 +++++++------
 .../internal/processors/cache/CacheData.java    | 14 ------
 .../processors/cache/ClusterCachesInfo.java     | 51 ++++++++------------
 .../cache/DynamicCacheDescriptor.java           | 31 +++---------
 .../processors/cache/GridCacheContext.java      | 15 ------
 .../processors/cache/GridCacheIoManager.java    |  6 ++-
 .../GridCachePartitionExchangeManager.java      |  4 +-
 .../processors/cache/GridCacheProcessor.java    |  9 ----
 .../dht/GridDhtAffinityAssignmentRequest.java   | 32 ++++++------
 .../dht/GridDhtAffinityAssignmentResponse.java  | 36 ++++++++++++--
 .../dht/GridDhtAssignmentFetchFuture.java       | 50 ++++++++++---------
 .../dht/GridDhtPartitionTopologyImpl.java       |  4 +-
 .../dht/preloader/GridDhtPreloader.java         |  4 +-
 .../continuous/CacheContinuousQueryManager.java |  1 -
 .../loadtests/hashmap/GridCacheTestContext.java |  1 -
 15 files changed, 131 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 8c275e0..bd41ccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -91,8 +91,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final Object mux = new Object();
 
     /** Pending affinity assignment futures. */
-    private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
-        pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
+        new ConcurrentHashMap8<>();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -118,6 +118,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @return Cache start topology version.
+     */
+    public AffinityTopologyVersion localStartVersion(int cacheId) {
+        DynamicCacheDescriptor desc = registeredCaches.get(cacheId);
+
+        return desc != null ? desc.localStartVersion() : null;
+    }
+
+    /**
      * Callback invoked from discovery thread when discovery message is received.
      *
      * @param type Event type.
@@ -414,8 +424,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     if (clientCacheStarted)
                         initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
                     else if (!req.clientStartOnly()) {
-                        assert fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion());
-
                         GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
 
                         assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
@@ -696,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to add.
      */
     public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut);
+        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
 
         assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut +
             ", allFuts=" + pendingAssignmentFetchFuts + ']';
@@ -706,9 +714,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to remove.
      */
     public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut);
+        boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut);
 
-        assert rmv : "Failed to remove assignment fetch future: " + fut.key();
+        assert rmv : "Failed to remove assignment fetch future: " + fut.id();
     }
 
     /**
@@ -720,13 +728,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
 
-        for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) {
-            if (fut.key().get1().equals(cacheId)) {
-                fut.onResponse(nodeId, res);
+        GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId());
 
-                break;
-            }
-        }
+        if (fut != null)
+            fut.onResponse(nodeId, res);
     }
 
     /**
@@ -1001,7 +1006,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         for (int i = 0; i < fetchFuts.size(); i++) {
             GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
 
-            Integer cacheId = fetchFut.key().get1();
+            Integer cacheId = fetchFut.cacheId();
 
             fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 82afdc7..0c97ab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -42,9 +42,6 @@ public class CacheData implements Serializable {
     private final CacheType cacheType;
 
     /** */
-    private final AffinityTopologyVersion startTopVer;
-
-    /** */
     private final IgniteUuid deploymentId;
 
     /** */
@@ -66,7 +63,6 @@ public class CacheData implements Serializable {
      * @param cacheCfg Cache configuration.
      * @param cacheId Cache ID.
      * @param cacheType Cache ID.
-     * @param startTopVer Topology version when cache was started.
      * @param deploymentId Cache deployment ID.
      * @param schema Query schema.
      * @param rcvdFrom Node ID cache was started from.
@@ -77,7 +73,6 @@ public class CacheData implements Serializable {
     CacheData(CacheConfiguration cacheCfg,
         int cacheId,
         CacheType cacheType,
-        AffinityTopologyVersion startTopVer,
         IgniteUuid deploymentId,
         QuerySchema schema,
         UUID rcvdFrom,
@@ -86,14 +81,12 @@ public class CacheData implements Serializable {
         byte flags) {
         assert cacheCfg != null;
         assert rcvdFrom != null : cacheCfg.getName();
-        assert startTopVer != null : cacheCfg.getName();
         assert deploymentId != null : cacheCfg.getName();
         assert template || cacheId != 0 : cacheCfg.getName();
 
         this.cacheCfg = cacheCfg;
         this.cacheId = cacheId;
         this.cacheType = cacheType;
-        this.startTopVer = startTopVer;
         this.deploymentId = deploymentId;
         this.schema = schema;
         this.rcvdFrom = rcvdFrom;
@@ -110,13 +103,6 @@ public class CacheData implements Serializable {
     }
 
     /**
-     * @return Start topology version.
-     */
-    public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
-    /**
      * @return {@code True} if this is template configuration.
      */
     public boolean template() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 44d41f8..efcf6a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -82,9 +82,6 @@ class ClusterCachesInfo {
     private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
 
     /** */
-    private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
-
-    /** */
     private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
 
     /**
@@ -340,19 +337,15 @@ class ClusterCachesInfo {
                     if (needExchange) {
                         req.clientStartOnly(true);
 
-                        desc.clientCacheStartVersion(topVer.nextMinorVersion());
+                        desc.localStartVersion(topVer.nextMinorVersion());
 
                         exchangeActions.addClientCacheToStart(req, desc);
                     }
                 }
 
                 if (!needExchange) {
-                    if (desc != null) {
-                        if (desc.clientCacheStartVersion() != null)
-                            waitTopVer = desc.clientCacheStartVersion();
-                        else
-                            waitTopVer = desc.startTopologyVersion();
-                    }
+                    if (desc != null)
+                        waitTopVer = desc.localStartVersion();
                 }
             }
             else if (req.globalStateChange())
@@ -404,7 +397,7 @@ class ClusterCachesInfo {
             for (DynamicCacheDescriptor desc : addedDescs) {
                 assert desc.template() || incMinorTopVer;
 
-                desc.startTopologyVersion(startTopVer);
+                desc.localStartVersion(startTopVer);
             }
         }
 
@@ -545,9 +538,11 @@ class ClusterCachesInfo {
                 locJoinStartCaches = new ArrayList<>();
 
                 if (!disconnectedState() && joinDiscoData != null) {
-                    processJoiningNode(joinDiscoData, node.id(), topVer);
+                    processJoiningNode(joinDiscoData, node.id());
 
                     for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                        desc.localStartVersion(topVer);
+
                         CacheConfiguration cfg = desc.cacheConfiguration();
 
                         CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
@@ -564,8 +559,7 @@ class ClusterCachesInfo {
                                 desc.deploymentId(),
                                 desc.schema());
 
-                            desc0.startTopologyVersion(desc.startTopologyVersion());
-                            desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
+                            desc0.localStartVersion(desc.localStartVersion());
                             desc0.receivedFrom(desc.receivedFrom());
                             desc0.staticallyConfigured(desc.staticallyConfigured());
 
@@ -577,11 +571,15 @@ class ClusterCachesInfo {
                     }
                 }
             }
-            else {
-                CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
 
-                if (discoData != null)
-                    processJoiningNode(discoData, node.id(), topVer);
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.localStartVersion(topVer);
+            }
+
+            for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.localStartVersion(topVer);
             }
         }
     }
@@ -607,7 +605,6 @@ class ClusterCachesInfo {
             CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                 desc.cacheId(),
                 desc.cacheType(),
-                desc.startTopologyVersion(),
                 desc.deploymentId(),
                 desc.schema(),
                 desc.receivedFrom(),
@@ -624,7 +621,6 @@ class ClusterCachesInfo {
             CacheData cacheData = new CacheData(desc.cacheConfiguration(),
                 0,
                 desc.cacheType(),
-                desc.startTopologyVersion(),
                 desc.deploymentId(),
                 desc.schema(),
                 desc.receivedFrom(),
@@ -659,7 +655,6 @@ class ClusterCachesInfo {
                 cacheData.deploymentId(),
                 cacheData.schema());
 
-            desc.startTopologyVersion(cacheData.startTopologyVersion());
             desc.receivedFrom(cacheData.receivedFrom());
             desc.staticallyConfigured(cacheData.staticallyConfigured());
 
@@ -679,7 +674,6 @@ class ClusterCachesInfo {
                 cacheData.deploymentId(),
                 cacheData.schema());
 
-            desc.startTopologyVersion(cacheData.startTopologyVersion());
             desc.receivedFrom(cacheData.receivedFrom());
             desc.staticallyConfigured(cacheData.staticallyConfigured());
 
@@ -723,12 +717,8 @@ class ClusterCachesInfo {
                 else
                     processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
             }
-            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
-                CacheJoinNodeDiscoveryData old =
-                    joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
-
-                assert old == null : old;
-            }
+            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
         }
     }
 
@@ -754,9 +744,8 @@ class ClusterCachesInfo {
     /**
      * @param joinData Joined node discovery data.
      * @param nodeId Joined node ID.
-     * @param topVer Topology version.
      */
-    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) {
+    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
         for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
             CacheConfiguration cfg = cacheInfo.config();
 
@@ -770,7 +759,6 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
-                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
 
@@ -791,7 +779,6 @@ class ClusterCachesInfo {
 
                 desc.staticallyConfigured(true);
                 desc.receivedFrom(nodeId);
-                desc.startTopologyVersion(topVer);
 
                 DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index fe859f8..130ebde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -63,9 +63,6 @@ public class DynamicCacheDescriptor {
     private boolean updatesAllowed = true;
 
     /** */
-    private AffinityTopologyVersion startTopVer;
-
-    /** */
     private Integer cacheId;
 
     /** */
@@ -78,7 +75,7 @@ public class DynamicCacheDescriptor {
     private volatile CacheObjectContext objCtx;
 
     /** */
-    private transient AffinityTopologyVersion clientCacheStartVer;
+    private volatile transient AffinityTopologyVersion locStartVer;
 
     /** Mutex to control schema. */
     private final Object schemaMux = new Object();
@@ -131,22 +128,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @return Start topology version.
-     */
-    @Nullable public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
-    /**
-     * @param startTopVer Start topology version.
-     */
-    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
-        assert startTopVer != null;
-
-        this.startTopVer = startTopVer;
-    }
-
-    /**
      * @return {@code True} if this is template configuration.
      */
     public boolean template() {
@@ -253,15 +234,15 @@ public class DynamicCacheDescriptor {
     /**
      * @return Version when client cache on local node was started.
      */
-    @Nullable AffinityTopologyVersion clientCacheStartVersion() {
-        return clientCacheStartVer;
+    @Nullable AffinityTopologyVersion localStartVersion() {
+        return locStartVer;
     }
 
     /**
-     * @param clientCacheStartVer Version when client cache on local node was started.
+     * @param locStartVer Version when cache on local node was started.
      */
-    public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
-        this.clientCacheStartVer = clientCacheStartVer;
+    public void localStartVersion(AffinityTopologyVersion locStartVer) {
+        this.locStartVer = locStartVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 8f0d842..2466a59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -237,9 +237,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** Topology version when cache was started on local node. */
     private AffinityTopologyVersion locStartTopVer;
 
-    /** Global cache start topology version. */
-    private AffinityTopologyVersion cacheStartTopVer;
-
     /** Dynamic cache deployment ID. */
     private IgniteUuid dynamicDeploymentId;
 
@@ -292,7 +289,6 @@ public class GridCacheContext<K, V> implements Externalizable {
         GridCacheSharedContext sharedCtx,
         CacheConfiguration cacheCfg,
         CacheType cacheType,
-        AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion locStartTopVer,
         boolean affNode,
         boolean updatesAllowed,
@@ -321,7 +317,6 @@ public class GridCacheContext<K, V> implements Externalizable {
         assert ctx != null;
         assert sharedCtx != null;
         assert cacheCfg != null;
-        assert cacheStartTopVer != null : cacheCfg.getName();
         assert locStartTopVer != null : cacheCfg.getName();
 
         assert evtMgr != null;
@@ -341,7 +336,6 @@ public class GridCacheContext<K, V> implements Externalizable {
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
         this.locStartTopVer = locStartTopVer;
-        this.cacheStartTopVer = cacheStartTopVer;
         this.affNode = affNode;
         this.updatesAllowed = updatesAllowed;
         this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -470,15 +464,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
-     * @return Cache start topology version.
-     */
-    public AffinityTopologyVersion cacheStartTopologyVersion() {
-        assert cacheStartTopVer != null : name();
-
-        return cacheStartTopVer;
-    }
-
-    /**
      * @return Cache default {@link ExpiryPolicy}.
      */
     @Nullable public ExpiryPolicy expiry() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5e7e401..348d9d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,8 +146,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
                     assert cacheMsg.topologyVersion() != null : cacheMsg;
 
-                    AffinityTopologyVersion waitVer =
-                        ((GridDhtAffinityAssignmentRequest)cacheMsg).waitTopologyVersion();
+                    AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId());
+
+                    if (waitVer == null)
+                        waitVer = new AffinityTopologyVersion(cctx.localNode().order());
 
                     // Need to wait for exchange to avoid race between cache start and affinity request.
                     fut = cctx.exchange().affinityReadyFuture(waitVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8f52ae6..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -895,7 +895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     boolean ready;
 
                     if (exchId != null) {
-                        AffinityTopologyVersion startTopVer = cacheCtx.cacheStartTopologyVersion();
+                        AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
 
                         ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
                     }
@@ -1301,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
                     if (cacheCtx != null &&
-                        cacheCtx.cacheStartTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+                        cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
                         continue;
 
                     GridDhtPartitionTopology top = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 82db451..f9b015d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1345,7 +1345,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
         @Nullable CachePluginManager pluginMgr,
         CacheType cacheType,
-        AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion locStartTopVer,
         CacheObjectContext cacheObjCtx,
         boolean affNode,
@@ -1420,7 +1419,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             sharedCtx,
             cfg,
             cacheType,
-            cacheStartTopVer,
             locStartTopVer,
             affNode,
             updatesAllowed,
@@ -1553,7 +1551,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 sharedCtx,
                 cfg,
                 cacheType,
-                cacheStartTopVer,
                 locStartTopVer,
                 affNode,
                 true,
@@ -1733,7 +1730,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             nearCfg,
             cacheDesc.cacheType(),
             cacheDesc.deploymentId(),
-            cacheDesc.startTopologyVersion(),
             exchTopVer,
             cacheDesc.schema()
         );
@@ -1755,7 +1751,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     t.get2(),
                     desc.cacheType(),
                     desc.deploymentId(),
-                    desc.startTopologyVersion(),
                     exchTopVer,
                     desc.schema()
                 );
@@ -1785,7 +1780,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         null,
                         desc.cacheType(),
                         desc.deploymentId(),
-                        desc.startTopologyVersion(),
                         exchTopVer,
                         desc.schema()
                     );
@@ -1801,7 +1795,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param reqNearCfg Near configuration if specified for client cache start request.
      * @param cacheType Cache type.
      * @param deploymentId Deployment ID.
-     * @param cacheStartTopVer Cache start topology version.
      * @param exchTopVer Current exchange version.
      * @param schema Query schema.
      * @throws IgniteCheckedException If failed.
@@ -1811,7 +1804,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         @Nullable NearCacheConfiguration reqNearCfg,
         CacheType cacheType,
         IgniteUuid deploymentId,
-        AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion exchTopVer,
         @Nullable QuerySchema schema
     ) throws IgniteCheckedException {
@@ -1839,7 +1831,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         GridCacheContext cacheCtx = createCache(ccfg,
             null,
             cacheType,
-            cacheStartTopVer,
             exchTopVer,
             cacheObjCtx,
             affNode,

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 0b3080e..f80adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -31,12 +31,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private long futId;
+
     /** Topology version being queried. */
     private AffinityTopologyVersion topVer;
 
-    /** */
-    private AffinityTopologyVersion waitTopVer;
-
     /**
      * Empty constructor.
      */
@@ -45,26 +45,26 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
     }
 
     /**
+     * @param futId Future ID.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
-     * @param waitTopVer Topology version to wait for before message processing.
      */
-    public GridDhtAffinityAssignmentRequest(int cacheId,
-        AffinityTopologyVersion topVer,
-        AffinityTopologyVersion waitTopVer) {
+    public GridDhtAffinityAssignmentRequest(
+        long futId,
+        int cacheId,
+        AffinityTopologyVersion topVer) {
         assert topVer != null;
-        assert waitTopVer != null;
 
+        this.futId = futId;
         this.cacheId = cacheId;
         this.topVer = topVer;
-        this.waitTopVer = waitTopVer;
     }
 
     /**
-     * @return Topology version to wait for before message processing.
+     * @return Future ID.
      */
-    public AffinityTopologyVersion waitTopologyVersion() {
-        return waitTopVer;
+    public long futureId() {
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -110,13 +110,13 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("waitTopVer", waitTopVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
@@ -138,7 +138,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
-                topVer = reader.readMessage("topVer");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -146,7 +146,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                waitTopVer = reader.readMessage("waitTopVer");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private long futId;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     }
 
     /**
+     * @param futId Future ID.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
      */
-    public GridDhtAffinityAssignmentResponse(int cacheId,
+    public GridDhtAffinityAssignmentResponse(
+        long futId,
+        int cacheId,
         @NotNull AffinityTopologyVersion topVer,
         List<List<ClusterNode>> affAssignment) {
+        this.futId = futId;
         this.cacheId = cacheId;
         this.topVer = topVer;
 
         affAssignmentIds = ids(affAssignment);
     }
 
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean partitionExchangeMessage() {
         return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 5:
+                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 5:
+                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 1d6563e..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -36,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private static IgniteLogger log;
 
     /** */
+    private static final AtomicLong idGen = new AtomicLong();
+
+    /** */
     private final GridCacheSharedContext ctx;
 
     /** List of available nodes this future can fetch data from. */
@@ -65,11 +68,13 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private ClusterNode pendingNode;
 
     /** */
-    @GridToStringInclude
-    private final T2<Integer, AffinityTopologyVersion> key;
+    private final long id;
 
     /** */
-    private final DynamicCacheDescriptor cacheDesc;
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final int cacheId;
 
     /**
      * @param ctx Context.
@@ -83,9 +88,11 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
+        this.topVer = topVer;
+        this.cacheId = cacheDesc.cacheId();
         this.ctx = ctx;
-        this.cacheDesc = cacheDesc;
-        this.key = new T2<>(cacheDesc.cacheId(), topVer);
+
+        id = idGen.getAndIncrement();
 
         Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
 
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     }
 
     /**
-     * Initializes fetch future.
+     * @return Cache ID.
      */
-    public void init() {
-        ctx.affinity().addDhtAssignmentFetchFuture(this);
+    public int cacheId() {
+        return cacheId;
+    }
 
-        requestFromNextNode();
+    /**
+     * @return Future ID.
+     */
+    public long id() {
+        return id;
     }
 
     /**
-     * @return Future key.
+     * Initializes fetch future.
      */
-    public T2<Integer, AffinityTopologyVersion> key() {
-        return key;
+    public void init() {
+        ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+        requestFromNextNode();
     }
 
     /**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
      * @param res Response.
      */
     public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-        if (!res.topologyVersion().equals(key.get2())) {
-            if (log.isDebugEnabled())
-                log.debug("Received affinity assignment for wrong topology version (will ignore) " +
-                    "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
-            return;
-        }
-
         GridDhtAffinityAssignmentResponse res0 = null;
 
         synchronized (this) {
@@ -189,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                             ", node=" + node + ']');
 
                     ctx.io().send(node,
-                        new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), cacheDesc.startTopologyVersion()),
+                        new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5b3dfc6..58ad600 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         int num = cctx.affinity().partitions();
 
         if (cctx.rebalanceEnabled()) {
-            boolean added = exchId.topologyVersion().equals(cctx.cacheStartTopologyVersion());
+            boolean added = exchId.topologyVersion().equals(cctx.startTopologyVersion());
 
             boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
 
@@ -1157,7 +1157,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // then we keep the newer value.
                     if (newPart != null &&
                         (newPart.updateSequence() < part.updateSequence() ||
-                        (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+                        (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                 AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
-                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+                    req.futureId(),
+                    cctx.cacheId(),
                     topVer,
                     assignment.assignment());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9fe29ef..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -345,7 +345,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
             assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
                 ", locStart=" + cctx.startTopologyVersion() +
-                ", cacheStart=" + cctx.cacheStartTopologyVersion() +
                 ", locNode=" + cctx.localNode() +
                 ", stopping=" + cctx.kernalContext().isStopping();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 6149586..4f0d9a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -80,7 +80,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             defaultCacheConfiguration(),
             CacheType.USER,
             AffinityTopologyVersion.ZERO,
-            AffinityTopologyVersion.ZERO,
             true,
             true,
             null,