You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/11 14:42:30 UTC
[1/7] ignite git commit: ignite-5075
Repository: ignite
Updated Branches:
refs/heads/ignite-5075 038c00f49 -> aee103166
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7424dc1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7424dc1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7424dc1
Branch: refs/heads/ignite-5075
Commit: b7424dc1f6d32e67478015cf83324c30bcc39781
Parents: 038c00f
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 13:46:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 13:46:44 2017 +0300
----------------------------------------------------------------------
.../internal/processors/cache/IgniteCacheOffheapManagerImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7424dc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 368b86c..cfbec12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -84,10 +84,10 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
@SuppressWarnings("PublicInnerClass")
public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
/** */
- private GridCacheSharedContext ctx;
+ protected GridCacheSharedContext ctx;
/** */
- private CacheGroupInfrastructure grp;
+ protected CacheGroupInfrastructure grp;
/** */
private IgniteLogger log;
[5/7] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-5075-cacheStart' into ignite-5075
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5c86708
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5c86708
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5c86708
Branch: refs/heads/ignite-5075
Commit: b5c86708c52d2efc85ee87ff67bde452a98ce2c6
Parents: b7424dc d24b08b
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 17:02:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 17:02:41 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 31 +++++++-----
.../internal/processors/cache/CacheData.java | 14 ------
.../processors/cache/ClusterCachesInfo.java | 47 +++++++-----------
.../cache/DynamicCacheDescriptor.java | 31 +++---------
.../processors/cache/GridCacheContext.java | 15 ------
.../processors/cache/GridCacheIoManager.java | 6 ++-
.../GridCachePartitionExchangeManager.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 9 ----
.../dht/GridDhtAffinityAssignmentRequest.java | 32 ++++++-------
.../dht/GridDhtAffinityAssignmentResponse.java | 36 ++++++++++++--
.../dht/GridDhtAssignmentFetchFuture.java | 50 +++++++++++---------
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 1 -
.../loadtests/hashmap/GridCacheTestContext.java | 1 -
14 files changed, 127 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index bd80bf0,bd41ccc..0443ba4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@@ -991,9 -1006,9 +998,9 @@@ public class CacheAffinitySharedManager
for (int i = 0; i < fetchFuts.size(); i++) {
GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
- Integer grpId = fetchFut.key().get1();
- Integer cacheId = fetchFut.cacheId();
++ Integer grpId = fetchFut.cacheId();
- fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
+ fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index fcad88a,0c97ab0..0927e25
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@@ -79,9 -72,7 +75,8 @@@ public class CacheData implements Seria
*/
CacheData(CacheConfiguration cacheCfg,
int cacheId,
+ int grpId,
CacheType cacheType,
- AffinityTopologyVersion startTopVer,
IgniteUuid deploymentId,
QuerySchema schema,
UUID rcvdFrom,
@@@ -90,16 -81,12 +85,14 @@@
byte flags) {
assert cacheCfg != null;
assert rcvdFrom != null : cacheCfg.getName();
- assert startTopVer != null : cacheCfg.getName();
assert deploymentId != null : cacheCfg.getName();
assert template || cacheId != 0 : cacheCfg.getName();
+ assert template || grpId != 0 : cacheCfg.getName();
this.cacheCfg = cacheCfg;
this.cacheId = cacheId;
+ this.grpId = grpId;
this.cacheType = cacheType;
- this.startTopVer = startTopVer;
this.deploymentId = deploymentId;
this.schema = schema;
this.rcvdFrom = rcvdFrom;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 9789fdf,efcf6a8..7800a45
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@@ -637,9 -604,7 +633,8 @@@ class ClusterCachesInfo
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
desc.cacheId(),
+ desc.groupDescriptor().groupId(),
desc.cacheType(),
- desc.startTopologyVersion(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
@@@ -668,9 -620,7 +663,8 @@@
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
0,
+ 0,
desc.cacheType(),
- desc.startTopologyVersion(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 94f86aa,2466a59..d5128f5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@@ -280,9 -288,7 +277,8 @@@ public class GridCacheContext<K, V> imp
GridKernalContext ctx,
GridCacheSharedContext sharedCtx,
CacheConfiguration cacheCfg,
+ CacheGroupInfrastructure grp,
CacheType cacheType,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion locStartTopVer,
boolean affNode,
boolean updatesAllowed,
@@@ -308,10 -317,8 +304,9 @@@
assert ctx != null;
assert sharedCtx != null;
assert cacheCfg != null;
- assert cacheStartTopVer != null : cacheCfg.getName();
assert locStartTopVer != null : cacheCfg.getName();
+ assert grp != null;
assert evtMgr != null;
assert storeMgr != null;
assert evictMgr != null;
@@@ -327,10 -334,8 +322,9 @@@
this.ctx = ctx;
this.sharedCtx = sharedCtx;
this.cacheCfg = cacheCfg;
+ this.grp = grp;
this.cacheType = cacheType;
this.locStartTopVer = locStartTopVer;
- this.cacheStartTopVer = cacheStartTopVer;
this.affNode = affNode;
this.updatesAllowed = updatesAllowed;
this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e426426,04c647f..98ad758
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -897,35 -889,37 +897,35 @@@ public class GridCachePartitionExchange
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
- cctx.forAllCaches(new IgniteInClosure<GridCacheContext>() {
- @Override public void apply(GridCacheContext cacheCtx) {
- if (!cacheCtx.isLocal()) {
- boolean ready;
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (!grp.isLocal()) {
+ boolean ready;
- if (exchId != null) {
- AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+ if (exchId != null) {
- AffinityTopologyVersion startTopVer = grp.groupStartVersion();
++ AffinityTopologyVersion startTopVer = grp.localStartVersion();
- ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
- }
- else
- ready = cacheCtx.started();
+ ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+ }
+ else
+ ready = grp.started();
- if (ready) {
- GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache();
+ if (ready) {
+ GridAffinityAssignmentCache affCache = grp.affinity();
- GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+ GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
- addFullPartitionsMap(m,
- dupData,
- compress,
- cacheCtx.cacheId(),
- locMap,
- affCache.similarAffinityKey());
+ addFullPartitionsMap(m,
+ dupData,
+ compress,
+ grp.groupId(),
+ locMap,
+ affCache.similarAffinityKey());
- if (exchId != null)
- m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
- }
+ if (exchId != null)
+ m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
}
}
- });
+ }
// It is important that client topologies be added after contexts.
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
@@@ -1299,12 -1296,12 +1299,12 @@@
boolean updated = false;
for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
- Integer cacheId = entry.getKey();
+ Integer grpId = entry.getKey();
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+ CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
- if (cacheCtx != null &&
- cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+ if (grp != null &&
- grp.groupStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
++ grp.localStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
continue;
GridDhtPartitionTopology top = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 86ead93,f9b015d..a72f21c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -1363,10 -1343,8 +1363,9 @@@ public class GridCacheProcessor extend
* @throws IgniteCheckedException If failed to create cache.
*/
private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
+ CacheGroupInfrastructure grp,
@Nullable CachePluginManager pluginMgr,
CacheType cacheType,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion locStartTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
@@@ -1435,9 -1418,7 +1434,8 @@@
ctx,
sharedCtx,
cfg,
+ grp,
cacheType,
- cacheStartTopVer,
locStartTopVer,
affNode,
updatesAllowed,
@@@ -1566,9 -1550,7 +1564,8 @@@
ctx,
sharedCtx,
cfg,
+ grp,
cacheType,
- cacheStartTopVer,
locStartTopVer,
affNode,
true,
@@@ -1852,40 -1828,9 +1844,39 @@@
ccfg.setNearConfiguration(reqNearCfg);
}
+ String grpName = startCfg.getGroupName();
+
+ CacheGroupInfrastructure grp = null;
+
+ if (grpName != null) {
+ for (CacheGroupInfrastructure grp0 : cacheGrps.values()) {
+ if (grp0.sharedGroup() && grpName.equals(grp0.name())) {
+ grp = grp0;
+
+ break;
+ }
+ }
+
+ if (grp == null) {
+ grp = startCacheGroup(grpDesc,
+ cacheType,
+ affNode,
+ cacheObjCtx,
+ exchTopVer);
+ }
+ }
+ else {
+ grp = startCacheGroup(grpDesc,
+ cacheType,
+ affNode,
+ cacheObjCtx,
+ exchTopVer);
+ }
+
GridCacheContext cacheCtx = createCache(ccfg,
+ grp,
null,
cacheType,
- cacheStartTopVer,
exchTopVer,
cacheObjCtx,
affNode,
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 14eb92f,f80adc5..d9d642a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@@ -45,19 -45,19 +45,19 @@@ public class GridDhtAffinityAssignmentR
}
/**
+ * @param futId Future ID.
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param topVer Topology version.
- * @param waitTopVer Topology version to wait for before message processing.
*/
- public GridDhtAffinityAssignmentRequest(int grpId,
- AffinityTopologyVersion topVer,
- AffinityTopologyVersion waitTopVer) {
+ public GridDhtAffinityAssignmentRequest(
+ long futId,
- int cacheId,
++ int grpId,
+ AffinityTopologyVersion topVer) {
assert topVer != null;
- assert waitTopVer != null;
+ this.futId = futId;
- this.cacheId = cacheId;
+ this.grpId = grpId;
this.topVer = topVer;
- this.waitTopVer = waitTopVer;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 6c01c8d,5d82171..4df3fc1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@@ -67,14 -72,18 +70,18 @@@ public class GridDhtAffinityAssignmentR
}
/**
+ * @param futId Future ID.
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param topVer Topology version.
* @param affAssignment Affinity assignment.
*/
- public GridDhtAffinityAssignmentResponse(int grpId,
+ public GridDhtAffinityAssignmentResponse(
+ long futId,
- int cacheId,
++ int grpId,
@NotNull AffinityTopologyVersion topVer,
List<List<ClusterNode>> affAssignment) {
+ this.futId = futId;
- this.cacheId = cacheId;
+ this.grpId = grpId;
this.topVer = topVer;
affAssignmentIds = ids(affAssignment);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index c8966ed,741ca5e..c008ef3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@@ -66,11 -68,13 +69,13 @@@ public class GridDhtAssignmentFetchFutu
private ClusterNode pendingNode;
/** */
- @GridToStringInclude
- private final T2<Integer, AffinityTopologyVersion> key;
+ private final long id;
/** */
- private final CacheGroupDescriptor grpDesc;
+ private final AffinityTopologyVersion topVer;
+
+ /** */
- private final int cacheId;
++ private final int grpId;
/**
* @param ctx Context.
@@@ -84,11 -88,13 +89,13 @@@
AffinityTopologyVersion topVer,
DiscoCache discoCache
) {
+ this.topVer = topVer;
- this.cacheId = cacheDesc.cacheId();
++ this.grpId = grpDesc.groupId();
this.ctx = ctx;
- this.grpDesc = grpDesc;
- this.key = new T2<>(grpDesc.groupId(), topVer);
+
+ id = idGen.getAndIncrement();
- Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
+ Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId());
LinkedList<ClusterNode> tmp = new LinkedList<>();
@@@ -190,7 -195,7 +196,7 @@@
", node=" + node + ']');
ctx.io().send(node,
- new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), grpDesc.startTopologyVersion()),
- new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
++ new GridDhtAffinityAssignmentRequest(id, grpId, topVer),
AFFINITY_POOL);
// Close window for listener notification.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index b76f548,58ad600..8ea9e46
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -396,12 -388,12 +396,12 @@@ public class GridDhtPartitionTopologyIm
", futVer=" + exchFut.topologyVersion() +
", fut=" + exchFut + ']';
- List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion());
+ List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
- int num = cctx.affinity().partitions();
+ int num = grp.affinity().partitions();
- if (cctx.rebalanceEnabled()) {
- boolean added = exchId.topologyVersion().equals(cctx.startTopologyVersion());
+ if (grp.rebalanceEnabled()) {
- boolean added = exchId.topologyVersion().equals(grp.groupStartVersion());
++ boolean added = exchId.topologyVersion().equals(grp.localStartVersion());
boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
@@@ -1173,7 -1157,7 +1173,7 @@@
// then we keep the newer value.
if (newPart != null &&
(newPart.updateSequence() < part.updateSequence() ||
- (grp.groupStartVersion().compareTo(newPart.topologyVersion()) > 0))
- (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
++ (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
) {
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5c86708/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 5a7dbc1,4f0d9a1..9126fd2
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@@ -78,12 -78,13 +78,11 @@@ public class GridCacheTestContext<K, V
null
),
defaultCacheConfiguration(),
+ null,
CacheType.USER,
AffinityTopologyVersion.ZERO,
- AffinityTopologyVersion.ZERO,
true,
true,
- null,
- null,
- null,
new GridCacheEventManager(),
new CacheOsStoreManager(null, new CacheConfiguration()),
new GridCacheEvictionManager(),
[6/7] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-5075-cacheStart' into ignite-5075
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4afeba75
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4afeba75
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4afeba75
Branch: refs/heads/ignite-5075
Commit: 4afeba7573b7ad58b0f25a1b7c6511232c30295e
Parents: b5c8670
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 17:09:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 17:19:24 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 34 ++++----------------
.../processors/cache/CacheGroupData.java | 23 ++++++-------
.../processors/cache/CacheGroupDescriptor.java | 23 ++++++-------
.../cache/CacheGroupInfrastructure.java | 13 ++------
.../processors/cache/ClusterCachesInfo.java | 18 +++++------
.../processors/cache/ExchangeActions.java | 11 +++++++
.../processors/cache/GridCacheIoManager.java | 3 +-
.../processors/cache/GridCacheProcessor.java | 1 -
.../dht/GridDhtAssignmentFetchFuture.java | 6 ++--
.../dht/GridDhtPartitionTopologyImpl.java | 2 +-
.../GridDhtPartitionsExchangeFuture.java | 22 +++++++------
.../dht/preloader/GridDhtPreloader.java | 3 --
.../processors/cache/IgniteCacheGroupsTest.java | 2 ++
13 files changed, 69 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0443ba4..dc31ae9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -118,16 +118,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param cacheId Cache ID.
- * @return Cache start topology version.
- */
- public AffinityTopologyVersion localStartVersion(int cacheId) {
- DynamicCacheDescriptor desc = registeredCaches.get(cacheId);
-
- return desc != null ? desc.localStartVersion() : null;
- }
-
- /**
* Callback invoked from discovery thread when discovery message is received.
*
* @param type Event type.
@@ -403,21 +393,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
- if (grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
- if (grp.groupStartVersion().equals(fut.topologyVersion())) {
- GridAffinityAssignmentCache aff = grp.affinity();
-
- List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
- fut.discoveryEvent(), fut.discoCache());
-
- aff.initialize(fut.topologyVersion(), assignment);
- }
- else {
- assert grp.localStartVersion().equals(fut.topologyVersion());
-
- initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, lateAffAssign);
- }
- }
+ if (grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE))
+ initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, lateAffAssign);
}
if (crd) {
@@ -881,7 +858,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert grpDesc != null : aff.groupName();
- return grpDesc.startTopologyVersion().equals(fut.topologyVersion()) ||
+ return fut.cacheGroupStarting(aff.groupId()) ||
+ cctx.localNodeId().equals(grpDesc.receivedFrom()) ||
!fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
(affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
}
@@ -974,7 +952,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (grp.isLocal())
continue;
- if (grp.groupStartVersion().equals(fut.topologyVersion())) {
+ if (canCalculateAffinity(grp.affinity(), fut)) {
List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(),
fut.discoveryEvent(),
fut.discoCache());
@@ -998,7 +976,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
for (int i = 0; i < fetchFuts.size(); i++) {
GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
- Integer grpId = fetchFut.cacheId();
+ Integer grpId = fetchFut.groupId();
fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
index 0123262..ea2c256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -19,9 +19,8 @@ package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -40,13 +39,13 @@ public class CacheGroupData implements Serializable {
private final int grpId;
/** */
- private final IgniteUuid deploymentId;
+ private final UUID rcvdFrom;
/** */
- private final CacheConfiguration cacheCfg;
+ private final IgniteUuid deploymentId;
/** */
- private final AffinityTopologyVersion startTopVer;
+ private final CacheConfiguration cacheCfg;
/** */
@GridToStringInclude
@@ -55,28 +54,30 @@ public class CacheGroupData implements Serializable {
/**
* @param cacheCfg Cache configuration.
* @param grpId
- * @param startTopVer
*/
public CacheGroupData(CacheConfiguration cacheCfg,
String grpName,
int grpId,
+ UUID rcvdFrom,
IgniteUuid deploymentId,
- AffinityTopologyVersion startTopVer,
Map<String, Integer> caches) {
assert cacheCfg != null;
assert grpName != null;
assert grpId != 0;
assert deploymentId != null;
- assert startTopVer != null;
this.cacheCfg = cacheCfg;
this.grpName = grpName;
this.grpId = grpId;
+ this.rcvdFrom = rcvdFrom;
this.deploymentId = deploymentId;
- this.startTopVer = startTopVer;
this.caches = caches;
}
+ public UUID receivedFrom() {
+ return rcvdFrom;
+ }
+
public String groupName() {
return grpName;
}
@@ -93,10 +94,6 @@ public class CacheGroupData implements Serializable {
return cacheCfg;
}
- public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
Map<String, Integer> caches() {
return caches;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
index da55871..c418002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
@@ -19,13 +19,11 @@ package org.apache.ignite.internal.processors.cache;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
/**
*
@@ -44,31 +42,34 @@ public class CacheGroupDescriptor {
private final CacheConfiguration cacheCfg;
/** */
- private final AffinityTopologyVersion startTopVer;
-
- /** */
@GridToStringInclude
private Map<String, Integer> caches;
+ /** */
+ private final UUID rcvdFrom;
+
CacheGroupDescriptor(String grpName,
int grpId,
+ UUID rcvdFrom,
IgniteUuid deploymentId,
CacheConfiguration cacheCfg,
- AffinityTopologyVersion startTopVer,
Map<String, Integer> caches) {
assert cacheCfg != null;
assert grpName != null;
assert grpId != 0;
- assert startTopVer != null;
this.grpName = grpName;
this.grpId = grpId;
+ this.rcvdFrom = rcvdFrom;
this.deploymentId = deploymentId;
this.cacheCfg = cacheCfg;
- this.startTopVer = startTopVer;
this.caches = caches;
}
+ public UUID receivedFrom() {
+ return rcvdFrom;
+ }
+
public IgniteUuid deploymentId() {
return deploymentId;
}
@@ -113,10 +114,6 @@ public class CacheGroupDescriptor {
return cacheCfg;
}
- public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
Map<String, Integer> caches() {
return caches;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index e1e3e93..2a4e918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -70,9 +70,6 @@ public class CacheGroupInfrastructure {
private GridDhtPartitionTopologyImpl top;
/** */
- private final AffinityTopologyVersion grpStartVer;
-
- /** */
private final AffinityTopologyVersion locStartVer;
/** */
@@ -122,7 +119,6 @@ public class CacheGroupInfrastructure {
CacheObjectContext cacheObjCtx,
FreeList freeList,
ReuseList reuseList,
- AffinityTopologyVersion grpStartVer,
AffinityTopologyVersion locStartVer) {
assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';
assert ccfg != null;
@@ -136,7 +132,6 @@ public class CacheGroupInfrastructure {
this.cacheObjCtx = cacheObjCtx;
this.freeList = freeList;
this.reuseList = reuseList;
- this.grpStartVer = grpStartVer;
this.locStartVer = locStartVer;
ioPlc = cacheType.ioPolicy();
@@ -257,10 +252,6 @@ public class CacheGroupInfrastructure {
return false;
}
- public AffinityTopologyVersion groupStartVersion() {
- return grpStartVer;
- }
-
public AffinityTopologyVersion localStartVersion() {
return locStartVer;
}
@@ -383,7 +374,9 @@ public class CacheGroupInfrastructure {
AffinityAssignment assignment = aff.cachedAffinity(topVer);
- GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(grpId,
+ GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+ req.futureId(),
+ grpId,
topVer,
assignment.assignment());
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 7800a45..8b23e5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -290,8 +290,8 @@ class ClusterCachesInfo {
CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions,
ccfg,
cacheId,
- req.deploymentId(),
- topVer.nextMinorVersion());
+ req.initiatingNodeId(),
+ req.deploymentId());
DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
ccfg,
@@ -651,8 +651,8 @@ class ClusterCachesInfo {
CacheGroupData grpData = new CacheGroupData(grpDesc.config(),
grpDesc.groupName(),
grpDesc.groupId(),
+ grpDesc.receivedFrom(),
grpDesc.deploymentId(),
- grpDesc.startTopologyVersion(),
grpDesc.caches());
cacheGrps.put(grpDesc.groupName(), grpData);
@@ -699,9 +699,9 @@ class ClusterCachesInfo {
for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(grpData.groupName(),
grpData.groupId(),
+ grpData.receivedFrom(),
grpData.deploymentId(),
grpData.config(),
- grpData.startTopologyVersion(),
grpData.caches());
CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupName(), grpDesc);
@@ -857,8 +857,8 @@ class ClusterCachesInfo {
CacheGroupDescriptor grpDesc = registerCacheGroup(null,
cfg,
cacheId,
- joinData.cacheDeploymentId(),
- topVer);
+ nodeId,
+ joinData.cacheDeploymentId());
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
cfg,
@@ -897,8 +897,8 @@ class ClusterCachesInfo {
ExchangeActions exchActions,
CacheConfiguration startedCacheCfg,
Integer cacheId,
- IgniteUuid deploymentId,
- AffinityTopologyVersion topVer) {
+ UUID rcvdFrom,
+ IgniteUuid deploymentId) {
if (startedCacheCfg.getGroupName() != null) {
CacheGroupDescriptor desc = registeredCacheGrps.get(startedCacheCfg.getGroupName());
@@ -919,9 +919,9 @@ class ClusterCachesInfo {
CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
grpName,
grpId,
+ rcvdFrom,
deploymentId,
startedCacheCfg,
- topVer,
caches);
CacheGroupDescriptor old = registeredCacheGrps.put(grpName, grpDesc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 8c9833f..977f544 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -296,6 +296,17 @@ public class ExchangeActions {
return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupDescriptor>emptyList();
}
+ public boolean cacheGroupStarting(int grpId) {
+ if (cacheGrpsToStart != null) {
+ for (CacheGroupDescriptor grpToStop : cacheGrpsToStart) {
+ if (grpToStop.groupId() == grpId)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
void addCacheGroupToStop(CacheGroupDescriptor grpDesc) {
if (cacheGrpsToStop == null)
cacheGrpsToStop = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 02d1086..fb2ac3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,7 +146,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
assert cacheMsg.topologyVersion() != null : cacheMsg;
- AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId());
+ // TODO IGNITE-5075.
+ AffinityTopologyVersion waitVer = null;//cctx.affinity().localStartVersion(((GridDhtAffinityAssignmentRequest) cacheMsg).groupId());
if (waitVer == null)
waitVer = new AffinityTopologyVersion(cctx.localNode().order());
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a72f21c..63b29b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1922,7 +1922,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cacheObjCtx,
freeList,
reuseList,
- desc.startTopologyVersion(),
exchTopVer);
grp.start();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index c008ef3..a06ff78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -113,10 +113,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
}
/**
- * @return Cache ID.
+ * @return Cache group ID.
*/
- public int cacheId() {
- return cacheId;
+ public int groupId() {
+ return grpId;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 8ea9e46..c75f05c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -414,7 +414,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean owned = locPart.own();
- assert owned : "Failed to own partition for oldest node [grp" + grp.name() +
+ assert owned : "Failed to own partition for oldest node [grp=" + grp.name() +
", part=" + locPart + ']';
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c7457c3..d15f069 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -394,13 +394,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
}
+ public boolean cacheGroupStarting(int grpId) {
+ return exchActions != null && exchActions.cacheGroupStarting(grpId);
+ }
+
/**
* @param cacheId Cache ID.
* @return {@code True} if non-client cache was added during this exchange.
*/
public boolean cacheStarted(int cacheId) {
return exchActions != null && exchActions.cacheStarted(cacheId);
-
}
/**
@@ -943,20 +946,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private void warnNoAffinityNodes() {
List<String> cachesWithoutNodes = null;
- for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors()) {
- if (cacheDesc.startTopologyVersion().compareTo(topologyVersion()) <= 0 &&
- discoCache.cacheGroupAffinityNodes(cacheDesc.groupDescriptor().groupId()).isEmpty()) {
+ for (GridCacheContext ctx : cctx.cacheContexts()) {
+ if (discoCache.cacheGroupAffinityNodes(ctx.groupId()).isEmpty()) {
if (cachesWithoutNodes == null)
cachesWithoutNodes = new ArrayList<>();
- cachesWithoutNodes.add(cacheDesc.cacheName());
+ cachesWithoutNodes.add(ctx.name());
// Fire event even if there is no client cache started.
- if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
+ if (ctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
Event evt = new CacheEvent(
- cacheDesc.cacheName(),
- cctx.localNode(),
- cctx.localNode(),
+ ctx.name(),
+ ctx.localNode(),
+ ctx.localNode(),
"All server nodes have left the cluster.",
EventType.EVT_CACHE_NODES_LEFT,
0,
@@ -973,7 +975,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
null
);
- cctx.gridEvents().record(evt);
+ ctx.gridEvents().record(evt);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index ec6e4af..3d62c2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -44,8 +44,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -66,7 +64,6 @@ import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4afeba75/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index ba01844..ff20803 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -44,6 +44,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ //cfg.setLateAffinityAssignment(false);
+
return cfg;
}
[2/7] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b59733a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b59733a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b59733a3
Branch: refs/heads/ignite-5075
Commit: b59733a391b412d94869418a4a30289042e7fc8d
Parents: b324c13
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 13:48:26 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 13:48:26 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/processors/cache/ClusterCachesInfo.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b59733a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index f74343b..1afedf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -768,7 +768,6 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
- desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
@@ -789,7 +788,6 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
- desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
[3/7] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/861b34b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/861b34b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/861b34b2
Branch: refs/heads/ignite-5075
Commit: 861b34b29a9a32f63204e1fb73f43cdcb8b049fc
Parents: b59733a
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 14:40:00 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 14:40:00 2017 +0300
----------------------------------------------------------------------
.../processors/cache/ClusterCachesInfo.java | 24 ++++++++++++--------
1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/861b34b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 1afedf1..44d41f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -82,6 +82,9 @@ class ClusterCachesInfo {
private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
/** */
+ private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
+
+ /** */
private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
/**
@@ -575,15 +578,10 @@ class ClusterCachesInfo {
}
}
else {
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- if (desc.startTopologyVersion() == null && node.id().equals(desc.receivedFrom()))
- desc.startTopologyVersion(topVer);
- }
+ CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
- for (DynamicCacheDescriptor desc : registeredTemplates().values()) {
- if (desc.startTopologyVersion() == null && node.id().equals(desc.receivedFrom()))
- desc.startTopologyVersion(topVer);
- }
+ if (discoData != null)
+ processJoiningNode(discoData, node.id(), topVer);
}
}
}
@@ -725,8 +723,12 @@ class ClusterCachesInfo {
else
processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
}
- else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
- processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), null);
+ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
+ CacheJoinNodeDiscoveryData old =
+ joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
+
+ assert old == null : old;
+ }
}
}
@@ -768,6 +770,7 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
+ desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
@@ -788,6 +791,7 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
+ desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
[7/7] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-5075-cacheStart' into ignite-5075
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075-cacheStart' into ignite-5075
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aee10316
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aee10316
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aee10316
Branch: refs/heads/ignite-5075
Commit: aee103166463a38494d73507e70ee1c55bb9f134
Parents: 4afeba7
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 17:42:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 17:42:22 2017 +0300
----------------------------------------------------------------------
.../processors/cache/CacheAffinitySharedManager.java | 2 +-
.../internal/processors/cache/CacheGroupInfrastructure.java | 9 +++++++++
.../internal/processors/cache/GridCacheProcessor.java | 1 +
.../cache/distributed/dht/GridDhtPartitionTopologyImpl.java | 5 +++--
4 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index dc31ae9..f85d110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -952,7 +952,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (grp.isLocal())
continue;
- if (canCalculateAffinity(grp.affinity(), fut)) {
+ if (fut.cacheGroupStarting(grp.groupId()) || cctx.localNodeId().equals(grp.receivedFrom())) {
List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(),
fut.discoveryEvent(),
fut.discoCache());
http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 2a4e918..04b45a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -105,6 +105,9 @@ public class CacheGroupInfrastructure {
/** */
private boolean storeCacheId;
+ /** */
+ private UUID rcvdFrom;
+
/**
* @param grpId Group ID.
* @param ctx Context.
@@ -112,6 +115,7 @@ public class CacheGroupInfrastructure {
*/
CacheGroupInfrastructure(GridCacheSharedContext ctx,
int grpId,
+ UUID rcvdFrom,
CacheType cacheType,
CacheConfiguration ccfg,
boolean affNode,
@@ -124,6 +128,7 @@ public class CacheGroupInfrastructure {
assert ccfg != null;
this.grpId = grpId;
+ this.rcvdFrom = rcvdFrom;
this.cacheType = cacheType;
this.ctx = ctx;
this.ccfg = ccfg;
@@ -143,6 +148,10 @@ public class CacheGroupInfrastructure {
log = ctx.kernalContext().log(getClass());
}
+ public UUID receivedFrom() {
+ return rcvdFrom;
+ }
+
public boolean storeCacheId() {
return storeCacheId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 63b29b8..be3b4fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1915,6 +1915,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheGroupInfrastructure grp = new CacheGroupInfrastructure(sharedCtx,
desc.groupId(),
+ desc.receivedFrom(),
cacheType,
cfg,
affNode,
http://git-wip-us.apache.org/repos/asf/ignite/blob/aee10316/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index c75f05c..9168d7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -401,9 +401,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int num = grp.affinity().partitions();
if (grp.rebalanceEnabled()) {
- boolean added = exchId.topologyVersion().equals(grp.localStartVersion());
+ boolean added = exchFut.cacheGroupStarting(grp.groupId()) ||
+ (exchId.isJoined() && exchId.nodeId().equals(grp.receivedFrom()));
- boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
+ boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined());
if (first) {
assert exchId.isJoined() || added;
[4/7] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d24b08b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d24b08b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d24b08b6
Branch: refs/heads/ignite-5075
Commit: d24b08b65aeee089894b62fa8619d0c24cbae23b
Parents: 861b34b
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 16:18:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 16:18:32 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 33 +++++++------
.../internal/processors/cache/CacheData.java | 14 ------
.../processors/cache/ClusterCachesInfo.java | 51 ++++++++------------
.../cache/DynamicCacheDescriptor.java | 31 +++---------
.../processors/cache/GridCacheContext.java | 15 ------
.../processors/cache/GridCacheIoManager.java | 6 ++-
.../GridCachePartitionExchangeManager.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 9 ----
.../dht/GridDhtAffinityAssignmentRequest.java | 32 ++++++------
.../dht/GridDhtAffinityAssignmentResponse.java | 36 ++++++++++++--
.../dht/GridDhtAssignmentFetchFuture.java | 50 ++++++++++---------
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 1 -
.../loadtests/hashmap/GridCacheTestContext.java | 1 -
15 files changed, 131 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 8c275e0..bd41ccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -91,8 +91,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
private final Object mux = new Object();
/** Pending affinity assignment futures. */
- private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
- pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
+ new ConcurrentHashMap8<>();
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -118,6 +118,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param cacheId Cache ID.
+ * @return Cache start topology version.
+ */
+ public AffinityTopologyVersion localStartVersion(int cacheId) {
+ DynamicCacheDescriptor desc = registeredCaches.get(cacheId);
+
+ return desc != null ? desc.localStartVersion() : null;
+ }
+
+ /**
* Callback invoked from discovery thread when discovery message is received.
*
* @param type Event type.
@@ -414,8 +424,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (clientCacheStarted)
initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
else if (!req.clientStartOnly()) {
- assert fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion());
-
GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
@@ -696,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param fut Future to add.
*/
public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
- GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut);
+ GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut +
", allFuts=" + pendingAssignmentFetchFuts + ']';
@@ -706,9 +714,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param fut Future to remove.
*/
public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
- boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut);
+ boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut);
- assert rmv : "Failed to remove assignment fetch future: " + fut.key();
+ assert rmv : "Failed to remove assignment fetch future: " + fut.id();
}
/**
@@ -720,13 +728,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (log.isDebugEnabled())
log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
- for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) {
- if (fut.key().get1().equals(cacheId)) {
- fut.onResponse(nodeId, res);
+ GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId());
- break;
- }
- }
+ if (fut != null)
+ fut.onResponse(nodeId, res);
}
/**
@@ -1001,7 +1006,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
for (int i = 0; i < fetchFuts.size(); i++) {
GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
- Integer cacheId = fetchFut.key().get1();
+ Integer cacheId = fetchFut.cacheId();
fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 82afdc7..0c97ab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -42,9 +42,6 @@ public class CacheData implements Serializable {
private final CacheType cacheType;
/** */
- private final AffinityTopologyVersion startTopVer;
-
- /** */
private final IgniteUuid deploymentId;
/** */
@@ -66,7 +63,6 @@ public class CacheData implements Serializable {
* @param cacheCfg Cache configuration.
* @param cacheId Cache ID.
* @param cacheType Cache ID.
- * @param startTopVer Topology version when cache was started.
* @param deploymentId Cache deployment ID.
* @param schema Query schema.
* @param rcvdFrom Node ID cache was started from.
@@ -77,7 +73,6 @@ public class CacheData implements Serializable {
CacheData(CacheConfiguration cacheCfg,
int cacheId,
CacheType cacheType,
- AffinityTopologyVersion startTopVer,
IgniteUuid deploymentId,
QuerySchema schema,
UUID rcvdFrom,
@@ -86,14 +81,12 @@ public class CacheData implements Serializable {
byte flags) {
assert cacheCfg != null;
assert rcvdFrom != null : cacheCfg.getName();
- assert startTopVer != null : cacheCfg.getName();
assert deploymentId != null : cacheCfg.getName();
assert template || cacheId != 0 : cacheCfg.getName();
this.cacheCfg = cacheCfg;
this.cacheId = cacheId;
this.cacheType = cacheType;
- this.startTopVer = startTopVer;
this.deploymentId = deploymentId;
this.schema = schema;
this.rcvdFrom = rcvdFrom;
@@ -110,13 +103,6 @@ public class CacheData implements Serializable {
}
/**
- * @return Start topology version.
- */
- public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
- /**
* @return {@code True} if this is template configuration.
*/
public boolean template() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 44d41f8..efcf6a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -82,9 +82,6 @@ class ClusterCachesInfo {
private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
/** */
- private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
-
- /** */
private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
/**
@@ -340,19 +337,15 @@ class ClusterCachesInfo {
if (needExchange) {
req.clientStartOnly(true);
- desc.clientCacheStartVersion(topVer.nextMinorVersion());
+ desc.localStartVersion(topVer.nextMinorVersion());
exchangeActions.addClientCacheToStart(req, desc);
}
}
if (!needExchange) {
- if (desc != null) {
- if (desc.clientCacheStartVersion() != null)
- waitTopVer = desc.clientCacheStartVersion();
- else
- waitTopVer = desc.startTopologyVersion();
- }
+ if (desc != null)
+ waitTopVer = desc.localStartVersion();
}
}
else if (req.globalStateChange())
@@ -404,7 +397,7 @@ class ClusterCachesInfo {
for (DynamicCacheDescriptor desc : addedDescs) {
assert desc.template() || incMinorTopVer;
- desc.startTopologyVersion(startTopVer);
+ desc.localStartVersion(startTopVer);
}
}
@@ -545,9 +538,11 @@ class ClusterCachesInfo {
locJoinStartCaches = new ArrayList<>();
if (!disconnectedState() && joinDiscoData != null) {
- processJoiningNode(joinDiscoData, node.id(), topVer);
+ processJoiningNode(joinDiscoData, node.id());
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ desc.localStartVersion(topVer);
+
CacheConfiguration cfg = desc.cacheConfiguration();
CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
@@ -564,8 +559,7 @@ class ClusterCachesInfo {
desc.deploymentId(),
desc.schema());
- desc0.startTopologyVersion(desc.startTopologyVersion());
- desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
+ desc0.localStartVersion(desc.localStartVersion());
desc0.receivedFrom(desc.receivedFrom());
desc0.staticallyConfigured(desc.staticallyConfigured());
@@ -577,11 +571,15 @@ class ClusterCachesInfo {
}
}
}
- else {
- CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
- if (discoData != null)
- processJoiningNode(discoData, node.id(), topVer);
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ if (node.id().equals(desc.receivedFrom()))
+ desc.localStartVersion(topVer);
+ }
+
+ for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ if (node.id().equals(desc.receivedFrom()))
+ desc.localStartVersion(topVer);
}
}
}
@@ -607,7 +605,6 @@ class ClusterCachesInfo {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
desc.cacheId(),
desc.cacheType(),
- desc.startTopologyVersion(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
@@ -624,7 +621,6 @@ class ClusterCachesInfo {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
0,
desc.cacheType(),
- desc.startTopologyVersion(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
@@ -659,7 +655,6 @@ class ClusterCachesInfo {
cacheData.deploymentId(),
cacheData.schema());
- desc.startTopologyVersion(cacheData.startTopologyVersion());
desc.receivedFrom(cacheData.receivedFrom());
desc.staticallyConfigured(cacheData.staticallyConfigured());
@@ -679,7 +674,6 @@ class ClusterCachesInfo {
cacheData.deploymentId(),
cacheData.schema());
- desc.startTopologyVersion(cacheData.startTopologyVersion());
desc.receivedFrom(cacheData.receivedFrom());
desc.staticallyConfigured(cacheData.staticallyConfigured());
@@ -723,12 +717,8 @@ class ClusterCachesInfo {
else
processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
}
- else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
- CacheJoinNodeDiscoveryData old =
- joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
-
- assert old == null : old;
- }
+ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+ processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
}
}
@@ -754,9 +744,8 @@ class ClusterCachesInfo {
/**
* @param joinData Joined node discovery data.
* @param nodeId Joined node ID.
- * @param topVer Topology version.
*/
- private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) {
+ private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
CacheConfiguration cfg = cacheInfo.config();
@@ -770,7 +759,6 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
- desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
@@ -791,7 +779,6 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
- desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index fe859f8..130ebde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -63,9 +63,6 @@ public class DynamicCacheDescriptor {
private boolean updatesAllowed = true;
/** */
- private AffinityTopologyVersion startTopVer;
-
- /** */
private Integer cacheId;
/** */
@@ -78,7 +75,7 @@ public class DynamicCacheDescriptor {
private volatile CacheObjectContext objCtx;
/** */
- private transient AffinityTopologyVersion clientCacheStartVer;
+ private volatile transient AffinityTopologyVersion locStartVer;
/** Mutex to control schema. */
private final Object schemaMux = new Object();
@@ -131,22 +128,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @return Start topology version.
- */
- @Nullable public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
- /**
- * @param startTopVer Start topology version.
- */
- public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
- assert startTopVer != null;
-
- this.startTopVer = startTopVer;
- }
-
- /**
* @return {@code True} if this is template configuration.
*/
public boolean template() {
@@ -253,15 +234,15 @@ public class DynamicCacheDescriptor {
/**
* @return Version when client cache on local node was started.
*/
- @Nullable AffinityTopologyVersion clientCacheStartVersion() {
- return clientCacheStartVer;
+ @Nullable AffinityTopologyVersion localStartVersion() {
+ return locStartVer;
}
/**
- * @param clientCacheStartVer Version when client cache on local node was started.
+ * @param locStartVer Version when cache on local node was started.
*/
- public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
- this.clientCacheStartVer = clientCacheStartVer;
+ public void localStartVersion(AffinityTopologyVersion locStartVer) {
+ this.locStartVer = locStartVer;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 8f0d842..2466a59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -237,9 +237,6 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Topology version when cache was started on local node. */
private AffinityTopologyVersion locStartTopVer;
- /** Global cache start topology version. */
- private AffinityTopologyVersion cacheStartTopVer;
-
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
@@ -292,7 +289,6 @@ public class GridCacheContext<K, V> implements Externalizable {
GridCacheSharedContext sharedCtx,
CacheConfiguration cacheCfg,
CacheType cacheType,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion locStartTopVer,
boolean affNode,
boolean updatesAllowed,
@@ -321,7 +317,6 @@ public class GridCacheContext<K, V> implements Externalizable {
assert ctx != null;
assert sharedCtx != null;
assert cacheCfg != null;
- assert cacheStartTopVer != null : cacheCfg.getName();
assert locStartTopVer != null : cacheCfg.getName();
assert evtMgr != null;
@@ -341,7 +336,6 @@ public class GridCacheContext<K, V> implements Externalizable {
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
this.locStartTopVer = locStartTopVer;
- this.cacheStartTopVer = cacheStartTopVer;
this.affNode = affNode;
this.updatesAllowed = updatesAllowed;
this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -470,15 +464,6 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * @return Cache start topology version.
- */
- public AffinityTopologyVersion cacheStartTopologyVersion() {
- assert cacheStartTopVer != null : name();
-
- return cacheStartTopVer;
- }
-
- /**
* @return Cache default {@link ExpiryPolicy}.
*/
@Nullable public ExpiryPolicy expiry() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5e7e401..348d9d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,8 +146,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
assert cacheMsg.topologyVersion() != null : cacheMsg;
- AffinityTopologyVersion waitVer =
- ((GridDhtAffinityAssignmentRequest)cacheMsg).waitTopologyVersion();
+ AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId());
+
+ if (waitVer == null)
+ waitVer = new AffinityTopologyVersion(cctx.localNode().order());
// Need to wait for exchange to avoid race between cache start and affinity request.
fut = cctx.exchange().affinityReadyFuture(waitVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8f52ae6..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -895,7 +895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean ready;
if (exchId != null) {
- AffinityTopologyVersion startTopVer = cacheCtx.cacheStartTopologyVersion();
+ AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
}
@@ -1301,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null &&
- cacheCtx.cacheStartTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+ cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
continue;
GridDhtPartitionTopology top = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 82db451..f9b015d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1345,7 +1345,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
@Nullable CachePluginManager pluginMgr,
CacheType cacheType,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion locStartTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
@@ -1420,7 +1419,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx,
cfg,
cacheType,
- cacheStartTopVer,
locStartTopVer,
affNode,
updatesAllowed,
@@ -1553,7 +1551,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx,
cfg,
cacheType,
- cacheStartTopVer,
locStartTopVer,
affNode,
true,
@@ -1733,7 +1730,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
nearCfg,
cacheDesc.cacheType(),
cacheDesc.deploymentId(),
- cacheDesc.startTopologyVersion(),
exchTopVer,
cacheDesc.schema()
);
@@ -1755,7 +1751,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
t.get2(),
desc.cacheType(),
desc.deploymentId(),
- desc.startTopologyVersion(),
exchTopVer,
desc.schema()
);
@@ -1785,7 +1780,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
null,
desc.cacheType(),
desc.deploymentId(),
- desc.startTopologyVersion(),
exchTopVer,
desc.schema()
);
@@ -1801,7 +1795,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param cacheType Cache type.
* @param deploymentId Deployment ID.
- * @param cacheStartTopVer Cache start topology version.
* @param exchTopVer Current exchange version.
* @param schema Query schema.
* @throws IgniteCheckedException If failed.
@@ -1811,7 +1804,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Nullable NearCacheConfiguration reqNearCfg,
CacheType cacheType,
IgniteUuid deploymentId,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion exchTopVer,
@Nullable QuerySchema schema
) throws IgniteCheckedException {
@@ -1839,7 +1831,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCacheContext cacheCtx = createCache(ccfg,
null,
cacheType,
- cacheStartTopVer,
exchTopVer,
cacheObjCtx,
affNode,
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 0b3080e..f80adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -31,12 +31,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private long futId;
+
/** Topology version being queried. */
private AffinityTopologyVersion topVer;
- /** */
- private AffinityTopologyVersion waitTopVer;
-
/**
* Empty constructor.
*/
@@ -45,26 +45,26 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
}
/**
+ * @param futId Future ID.
* @param cacheId Cache ID.
* @param topVer Topology version.
- * @param waitTopVer Topology version to wait for before message processing.
*/
- public GridDhtAffinityAssignmentRequest(int cacheId,
- AffinityTopologyVersion topVer,
- AffinityTopologyVersion waitTopVer) {
+ public GridDhtAffinityAssignmentRequest(
+ long futId,
+ int cacheId,
+ AffinityTopologyVersion topVer) {
assert topVer != null;
- assert waitTopVer != null;
+ this.futId = futId;
this.cacheId = cacheId;
this.topVer = topVer;
- this.waitTopVer = waitTopVer;
}
/**
- * @return Topology version to wait for before message processing.
+ * @return Future ID.
*/
- public AffinityTopologyVersion waitTopologyVersion() {
- return waitTopVer;
+ public long futureId() {
+ return futId;
}
/** {@inheritDoc} */
@@ -110,13 +110,13 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
case 4:
- if (!writer.writeMessage("waitTopVer", waitTopVer))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
@@ -138,7 +138,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
switch (reader.state()) {
case 3:
- topVer = reader.readMessage("topVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -146,7 +146,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
reader.incrementState();
case 4:
- waitTopVer = reader.readMessage("waitTopVer");
+ topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private long futId;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
}
/**
+ * @param futId Future ID.
* @param cacheId Cache ID.
* @param topVer Topology version.
* @param affAssignment Affinity assignment.
*/
- public GridDhtAffinityAssignmentResponse(int cacheId,
+ public GridDhtAffinityAssignmentResponse(
+ long futId,
+ int cacheId,
@NotNull AffinityTopologyVersion topVer,
List<List<ClusterNode>> affAssignment) {
+ this.futId = futId;
this.cacheId = cacheId;
this.topVer = topVer;
affAssignmentIds = ids(affAssignment);
}
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
/** {@inheritDoc} */
@Override public boolean partitionExchangeMessage() {
return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
writer.incrementState();
case 4:
- if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
case 5:
+ if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 4:
- idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 5:
+ idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 1d6563e..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -36,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private static IgniteLogger log;
/** */
+ private static final AtomicLong idGen = new AtomicLong();
+
+ /** */
private final GridCacheSharedContext ctx;
/** List of available nodes this future can fetch data from. */
@@ -65,11 +68,13 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private ClusterNode pendingNode;
/** */
- @GridToStringInclude
- private final T2<Integer, AffinityTopologyVersion> key;
+ private final long id;
/** */
- private final DynamicCacheDescriptor cacheDesc;
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final int cacheId;
/**
* @param ctx Context.
@@ -83,9 +88,11 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
AffinityTopologyVersion topVer,
DiscoCache discoCache
) {
+ this.topVer = topVer;
+ this.cacheId = cacheDesc.cacheId();
this.ctx = ctx;
- this.cacheDesc = cacheDesc;
- this.key = new T2<>(cacheDesc.cacheId(), topVer);
+
+ id = idGen.getAndIncrement();
Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
}
/**
- * Initializes fetch future.
+ * @return Cache ID.
*/
- public void init() {
- ctx.affinity().addDhtAssignmentFetchFuture(this);
+ public int cacheId() {
+ return cacheId;
+ }
- requestFromNextNode();
+ /**
+ * @return Future ID.
+ */
+ public long id() {
+ return id;
}
/**
- * @return Future key.
+ * Initializes fetch future.
*/
- public T2<Integer, AffinityTopologyVersion> key() {
- return key;
+ public void init() {
+ ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+ requestFromNextNode();
}
/**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
* @param res Response.
*/
public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- if (!res.topologyVersion().equals(key.get2())) {
- if (log.isDebugEnabled())
- log.debug("Received affinity assignment for wrong topology version (will ignore) " +
- "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
- return;
- }
-
GridDhtAffinityAssignmentResponse res0 = null;
synchronized (this) {
@@ -189,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
", node=" + node + ']');
ctx.io().send(node,
- new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), cacheDesc.startTopologyVersion()),
+ new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
AFFINITY_POOL);
// Close window for listener notification.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5b3dfc6..58ad600 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int num = cctx.affinity().partitions();
if (cctx.rebalanceEnabled()) {
- boolean added = exchId.topologyVersion().equals(cctx.cacheStartTopologyVersion());
+ boolean added = exchId.topologyVersion().equals(cctx.startTopologyVersion());
boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
@@ -1157,7 +1157,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// then we keep the newer value.
if (newPart != null &&
(newPart.updateSequence() < part.updateSequence() ||
- (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+ (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
) {
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
AffinityAssignment assignment = cctx.affinity().assignment(topVer);
- GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+ GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+ req.futureId(),
+ cctx.cacheId(),
topVer,
assignment.assignment());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9fe29ef..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -345,7 +345,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
", locStart=" + cctx.startTopologyVersion() +
- ", cacheStart=" + cctx.cacheStartTopologyVersion() +
", locNode=" + cctx.localNode() +
", stopping=" + cctx.kernalContext().isStopping();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 6149586..4f0d9a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -80,7 +80,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
defaultCacheConfiguration(),
CacheType.USER,
AffinityTopologyVersion.ZERO,
- AffinityTopologyVersion.ZERO,
true,
true,
null,