You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/11 14:42:33 UTC
[4/7] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d24b08b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d24b08b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d24b08b6
Branch: refs/heads/ignite-5075
Commit: d24b08b65aeee089894b62fa8619d0c24cbae23b
Parents: 861b34b
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 11 16:18:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 11 16:18:32 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 33 +++++++------
.../internal/processors/cache/CacheData.java | 14 ------
.../processors/cache/ClusterCachesInfo.java | 51 ++++++++------------
.../cache/DynamicCacheDescriptor.java | 31 +++---------
.../processors/cache/GridCacheContext.java | 15 ------
.../processors/cache/GridCacheIoManager.java | 6 ++-
.../GridCachePartitionExchangeManager.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 9 ----
.../dht/GridDhtAffinityAssignmentRequest.java | 32 ++++++------
.../dht/GridDhtAffinityAssignmentResponse.java | 36 ++++++++++++--
.../dht/GridDhtAssignmentFetchFuture.java | 50 ++++++++++---------
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 1 -
.../loadtests/hashmap/GridCacheTestContext.java | 1 -
15 files changed, 131 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 8c275e0..bd41ccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -91,8 +91,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
private final Object mux = new Object();
/** Pending affinity assignment futures. */
- private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
- pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
+ new ConcurrentHashMap8<>();
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -118,6 +118,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param cacheId Cache ID.
+ * @return Cache start topology version.
+ */
+ public AffinityTopologyVersion localStartVersion(int cacheId) {
+ DynamicCacheDescriptor desc = registeredCaches.get(cacheId);
+
+ return desc != null ? desc.localStartVersion() : null;
+ }
+
+ /**
* Callback invoked from discovery thread when discovery message is received.
*
* @param type Event type.
@@ -414,8 +424,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (clientCacheStarted)
initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
else if (!req.clientStartOnly()) {
- assert fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion());
-
GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
@@ -696,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param fut Future to add.
*/
public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
- GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut);
+ GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut +
", allFuts=" + pendingAssignmentFetchFuts + ']';
@@ -706,9 +714,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param fut Future to remove.
*/
public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
- boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut);
+ boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut);
- assert rmv : "Failed to remove assignment fetch future: " + fut.key();
+ assert rmv : "Failed to remove assignment fetch future: " + fut.id();
}
/**
@@ -720,13 +728,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (log.isDebugEnabled())
log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
- for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) {
- if (fut.key().get1().equals(cacheId)) {
- fut.onResponse(nodeId, res);
+ GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId());
- break;
- }
- }
+ if (fut != null)
+ fut.onResponse(nodeId, res);
}
/**
@@ -1001,7 +1006,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
for (int i = 0; i < fetchFuts.size(); i++) {
GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
- Integer cacheId = fetchFut.key().get1();
+ Integer cacheId = fetchFut.cacheId();
fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 82afdc7..0c97ab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -42,9 +42,6 @@ public class CacheData implements Serializable {
private final CacheType cacheType;
/** */
- private final AffinityTopologyVersion startTopVer;
-
- /** */
private final IgniteUuid deploymentId;
/** */
@@ -66,7 +63,6 @@ public class CacheData implements Serializable {
* @param cacheCfg Cache configuration.
* @param cacheId Cache ID.
* @param cacheType Cache ID.
- * @param startTopVer Topology version when cache was started.
* @param deploymentId Cache deployment ID.
* @param schema Query schema.
* @param rcvdFrom Node ID cache was started from.
@@ -77,7 +73,6 @@ public class CacheData implements Serializable {
CacheData(CacheConfiguration cacheCfg,
int cacheId,
CacheType cacheType,
- AffinityTopologyVersion startTopVer,
IgniteUuid deploymentId,
QuerySchema schema,
UUID rcvdFrom,
@@ -86,14 +81,12 @@ public class CacheData implements Serializable {
byte flags) {
assert cacheCfg != null;
assert rcvdFrom != null : cacheCfg.getName();
- assert startTopVer != null : cacheCfg.getName();
assert deploymentId != null : cacheCfg.getName();
assert template || cacheId != 0 : cacheCfg.getName();
this.cacheCfg = cacheCfg;
this.cacheId = cacheId;
this.cacheType = cacheType;
- this.startTopVer = startTopVer;
this.deploymentId = deploymentId;
this.schema = schema;
this.rcvdFrom = rcvdFrom;
@@ -110,13 +103,6 @@ public class CacheData implements Serializable {
}
/**
- * @return Start topology version.
- */
- public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
- /**
* @return {@code True} if this is template configuration.
*/
public boolean template() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 44d41f8..efcf6a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -82,9 +82,6 @@ class ClusterCachesInfo {
private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
/** */
- private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
-
- /** */
private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
/**
@@ -340,19 +337,15 @@ class ClusterCachesInfo {
if (needExchange) {
req.clientStartOnly(true);
- desc.clientCacheStartVersion(topVer.nextMinorVersion());
+ desc.localStartVersion(topVer.nextMinorVersion());
exchangeActions.addClientCacheToStart(req, desc);
}
}
if (!needExchange) {
- if (desc != null) {
- if (desc.clientCacheStartVersion() != null)
- waitTopVer = desc.clientCacheStartVersion();
- else
- waitTopVer = desc.startTopologyVersion();
- }
+ if (desc != null)
+ waitTopVer = desc.localStartVersion();
}
}
else if (req.globalStateChange())
@@ -404,7 +397,7 @@ class ClusterCachesInfo {
for (DynamicCacheDescriptor desc : addedDescs) {
assert desc.template() || incMinorTopVer;
- desc.startTopologyVersion(startTopVer);
+ desc.localStartVersion(startTopVer);
}
}
@@ -545,9 +538,11 @@ class ClusterCachesInfo {
locJoinStartCaches = new ArrayList<>();
if (!disconnectedState() && joinDiscoData != null) {
- processJoiningNode(joinDiscoData, node.id(), topVer);
+ processJoiningNode(joinDiscoData, node.id());
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ desc.localStartVersion(topVer);
+
CacheConfiguration cfg = desc.cacheConfiguration();
CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
@@ -564,8 +559,7 @@ class ClusterCachesInfo {
desc.deploymentId(),
desc.schema());
- desc0.startTopologyVersion(desc.startTopologyVersion());
- desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
+ desc0.localStartVersion(desc.localStartVersion());
desc0.receivedFrom(desc.receivedFrom());
desc0.staticallyConfigured(desc.staticallyConfigured());
@@ -577,11 +571,15 @@ class ClusterCachesInfo {
}
}
}
- else {
- CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
- if (discoData != null)
- processJoiningNode(discoData, node.id(), topVer);
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ if (node.id().equals(desc.receivedFrom()))
+ desc.localStartVersion(topVer);
+ }
+
+ for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ if (node.id().equals(desc.receivedFrom()))
+ desc.localStartVersion(topVer);
}
}
}
@@ -607,7 +605,6 @@ class ClusterCachesInfo {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
desc.cacheId(),
desc.cacheType(),
- desc.startTopologyVersion(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
@@ -624,7 +621,6 @@ class ClusterCachesInfo {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
0,
desc.cacheType(),
- desc.startTopologyVersion(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
@@ -659,7 +655,6 @@ class ClusterCachesInfo {
cacheData.deploymentId(),
cacheData.schema());
- desc.startTopologyVersion(cacheData.startTopologyVersion());
desc.receivedFrom(cacheData.receivedFrom());
desc.staticallyConfigured(cacheData.staticallyConfigured());
@@ -679,7 +674,6 @@ class ClusterCachesInfo {
cacheData.deploymentId(),
cacheData.schema());
- desc.startTopologyVersion(cacheData.startTopologyVersion());
desc.receivedFrom(cacheData.receivedFrom());
desc.staticallyConfigured(cacheData.staticallyConfigured());
@@ -723,12 +717,8 @@ class ClusterCachesInfo {
else
processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
}
- else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
- CacheJoinNodeDiscoveryData old =
- joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
-
- assert old == null : old;
- }
+ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+ processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
}
}
@@ -754,9 +744,8 @@ class ClusterCachesInfo {
/**
* @param joinData Joined node discovery data.
* @param nodeId Joined node ID.
- * @param topVer Topology version.
*/
- private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) {
+ private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
CacheConfiguration cfg = cacheInfo.config();
@@ -770,7 +759,6 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
- desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
@@ -791,7 +779,6 @@ class ClusterCachesInfo {
desc.staticallyConfigured(true);
desc.receivedFrom(nodeId);
- desc.startTopologyVersion(topVer);
DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index fe859f8..130ebde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -63,9 +63,6 @@ public class DynamicCacheDescriptor {
private boolean updatesAllowed = true;
/** */
- private AffinityTopologyVersion startTopVer;
-
- /** */
private Integer cacheId;
/** */
@@ -78,7 +75,7 @@ public class DynamicCacheDescriptor {
private volatile CacheObjectContext objCtx;
/** */
- private transient AffinityTopologyVersion clientCacheStartVer;
+ private volatile transient AffinityTopologyVersion locStartVer;
/** Mutex to control schema. */
private final Object schemaMux = new Object();
@@ -131,22 +128,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @return Start topology version.
- */
- @Nullable public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
- /**
- * @param startTopVer Start topology version.
- */
- public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
- assert startTopVer != null;
-
- this.startTopVer = startTopVer;
- }
-
- /**
* @return {@code True} if this is template configuration.
*/
public boolean template() {
@@ -253,15 +234,15 @@ public class DynamicCacheDescriptor {
/**
* @return Version when client cache on local node was started.
*/
- @Nullable AffinityTopologyVersion clientCacheStartVersion() {
- return clientCacheStartVer;
+ @Nullable AffinityTopologyVersion localStartVersion() {
+ return locStartVer;
}
/**
- * @param clientCacheStartVer Version when client cache on local node was started.
+ * @param locStartVer Version when cache on local node was started.
*/
- public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
- this.clientCacheStartVer = clientCacheStartVer;
+ public void localStartVersion(AffinityTopologyVersion locStartVer) {
+ this.locStartVer = locStartVer;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 8f0d842..2466a59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -237,9 +237,6 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Topology version when cache was started on local node. */
private AffinityTopologyVersion locStartTopVer;
- /** Global cache start topology version. */
- private AffinityTopologyVersion cacheStartTopVer;
-
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
@@ -292,7 +289,6 @@ public class GridCacheContext<K, V> implements Externalizable {
GridCacheSharedContext sharedCtx,
CacheConfiguration cacheCfg,
CacheType cacheType,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion locStartTopVer,
boolean affNode,
boolean updatesAllowed,
@@ -321,7 +317,6 @@ public class GridCacheContext<K, V> implements Externalizable {
assert ctx != null;
assert sharedCtx != null;
assert cacheCfg != null;
- assert cacheStartTopVer != null : cacheCfg.getName();
assert locStartTopVer != null : cacheCfg.getName();
assert evtMgr != null;
@@ -341,7 +336,6 @@ public class GridCacheContext<K, V> implements Externalizable {
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
this.locStartTopVer = locStartTopVer;
- this.cacheStartTopVer = cacheStartTopVer;
this.affNode = affNode;
this.updatesAllowed = updatesAllowed;
this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -470,15 +464,6 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * @return Cache start topology version.
- */
- public AffinityTopologyVersion cacheStartTopologyVersion() {
- assert cacheStartTopVer != null : name();
-
- return cacheStartTopVer;
- }
-
- /**
* @return Cache default {@link ExpiryPolicy}.
*/
@Nullable public ExpiryPolicy expiry() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5e7e401..348d9d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,8 +146,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
assert cacheMsg.topologyVersion() != null : cacheMsg;
- AffinityTopologyVersion waitVer =
- ((GridDhtAffinityAssignmentRequest)cacheMsg).waitTopologyVersion();
+ AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId());
+
+ if (waitVer == null)
+ waitVer = new AffinityTopologyVersion(cctx.localNode().order());
// Need to wait for exchange to avoid race between cache start and affinity request.
fut = cctx.exchange().affinityReadyFuture(waitVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8f52ae6..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -895,7 +895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean ready;
if (exchId != null) {
- AffinityTopologyVersion startTopVer = cacheCtx.cacheStartTopologyVersion();
+ AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
}
@@ -1301,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null &&
- cacheCtx.cacheStartTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+ cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
continue;
GridDhtPartitionTopology top = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 82db451..f9b015d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1345,7 +1345,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
@Nullable CachePluginManager pluginMgr,
CacheType cacheType,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion locStartTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
@@ -1420,7 +1419,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx,
cfg,
cacheType,
- cacheStartTopVer,
locStartTopVer,
affNode,
updatesAllowed,
@@ -1553,7 +1551,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx,
cfg,
cacheType,
- cacheStartTopVer,
locStartTopVer,
affNode,
true,
@@ -1733,7 +1730,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
nearCfg,
cacheDesc.cacheType(),
cacheDesc.deploymentId(),
- cacheDesc.startTopologyVersion(),
exchTopVer,
cacheDesc.schema()
);
@@ -1755,7 +1751,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
t.get2(),
desc.cacheType(),
desc.deploymentId(),
- desc.startTopologyVersion(),
exchTopVer,
desc.schema()
);
@@ -1785,7 +1780,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
null,
desc.cacheType(),
desc.deploymentId(),
- desc.startTopologyVersion(),
exchTopVer,
desc.schema()
);
@@ -1801,7 +1795,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param cacheType Cache type.
* @param deploymentId Deployment ID.
- * @param cacheStartTopVer Cache start topology version.
* @param exchTopVer Current exchange version.
* @param schema Query schema.
* @throws IgniteCheckedException If failed.
@@ -1811,7 +1804,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Nullable NearCacheConfiguration reqNearCfg,
CacheType cacheType,
IgniteUuid deploymentId,
- AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion exchTopVer,
@Nullable QuerySchema schema
) throws IgniteCheckedException {
@@ -1839,7 +1831,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCacheContext cacheCtx = createCache(ccfg,
null,
cacheType,
- cacheStartTopVer,
exchTopVer,
cacheObjCtx,
affNode,
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 0b3080e..f80adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -31,12 +31,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private long futId;
+
/** Topology version being queried. */
private AffinityTopologyVersion topVer;
- /** */
- private AffinityTopologyVersion waitTopVer;
-
/**
* Empty constructor.
*/
@@ -45,26 +45,26 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
}
/**
+ * @param futId Future ID.
* @param cacheId Cache ID.
* @param topVer Topology version.
- * @param waitTopVer Topology version to wait for before message processing.
*/
- public GridDhtAffinityAssignmentRequest(int cacheId,
- AffinityTopologyVersion topVer,
- AffinityTopologyVersion waitTopVer) {
+ public GridDhtAffinityAssignmentRequest(
+ long futId,
+ int cacheId,
+ AffinityTopologyVersion topVer) {
assert topVer != null;
- assert waitTopVer != null;
+ this.futId = futId;
this.cacheId = cacheId;
this.topVer = topVer;
- this.waitTopVer = waitTopVer;
}
/**
- * @return Topology version to wait for before message processing.
+ * @return Future ID.
*/
- public AffinityTopologyVersion waitTopologyVersion() {
- return waitTopVer;
+ public long futureId() {
+ return futId;
}
/** {@inheritDoc} */
@@ -110,13 +110,13 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
case 4:
- if (!writer.writeMessage("waitTopVer", waitTopVer))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
@@ -138,7 +138,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
switch (reader.state()) {
case 3:
- topVer = reader.readMessage("topVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -146,7 +146,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
reader.incrementState();
case 4:
- waitTopVer = reader.readMessage("waitTopVer");
+ topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private long futId;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
}
/**
+ * @param futId Future ID.
* @param cacheId Cache ID.
* @param topVer Topology version.
* @param affAssignment Affinity assignment.
*/
- public GridDhtAffinityAssignmentResponse(int cacheId,
+ public GridDhtAffinityAssignmentResponse(
+ long futId,
+ int cacheId,
@NotNull AffinityTopologyVersion topVer,
List<List<ClusterNode>> affAssignment) {
+ this.futId = futId;
this.cacheId = cacheId;
this.topVer = topVer;
affAssignmentIds = ids(affAssignment);
}
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
/** {@inheritDoc} */
@Override public boolean partitionExchangeMessage() {
return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
writer.incrementState();
case 4:
- if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
case 5:
+ if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 4:
- idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 5:
+ idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 1d6563e..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -36,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private static IgniteLogger log;
/** */
+ private static final AtomicLong idGen = new AtomicLong();
+
+ /** */
private final GridCacheSharedContext ctx;
/** List of available nodes this future can fetch data from. */
@@ -65,11 +68,13 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private ClusterNode pendingNode;
/** */
- @GridToStringInclude
- private final T2<Integer, AffinityTopologyVersion> key;
+ private final long id;
/** */
- private final DynamicCacheDescriptor cacheDesc;
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final int cacheId;
/**
* @param ctx Context.
@@ -83,9 +88,11 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
AffinityTopologyVersion topVer,
DiscoCache discoCache
) {
+ this.topVer = topVer;
+ this.cacheId = cacheDesc.cacheId();
this.ctx = ctx;
- this.cacheDesc = cacheDesc;
- this.key = new T2<>(cacheDesc.cacheId(), topVer);
+
+ id = idGen.getAndIncrement();
Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
}
/**
- * Initializes fetch future.
+ * @return Cache ID.
*/
- public void init() {
- ctx.affinity().addDhtAssignmentFetchFuture(this);
+ public int cacheId() {
+ return cacheId;
+ }
- requestFromNextNode();
+ /**
+ * @return Future ID.
+ */
+ public long id() {
+ return id;
}
/**
- * @return Future key.
+ * Initializes fetch future.
*/
- public T2<Integer, AffinityTopologyVersion> key() {
- return key;
+ public void init() {
+ ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+ requestFromNextNode();
}
/**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
* @param res Response.
*/
public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- if (!res.topologyVersion().equals(key.get2())) {
- if (log.isDebugEnabled())
- log.debug("Received affinity assignment for wrong topology version (will ignore) " +
- "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
- return;
- }
-
GridDhtAffinityAssignmentResponse res0 = null;
synchronized (this) {
@@ -189,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
", node=" + node + ']');
ctx.io().send(node,
- new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), cacheDesc.startTopologyVersion()),
+ new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
AFFINITY_POOL);
// Close window for listener notification.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5b3dfc6..58ad600 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int num = cctx.affinity().partitions();
if (cctx.rebalanceEnabled()) {
- boolean added = exchId.topologyVersion().equals(cctx.cacheStartTopologyVersion());
+ boolean added = exchId.topologyVersion().equals(cctx.startTopologyVersion());
boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
@@ -1157,7 +1157,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// then we keep the newer value.
if (newPart != null &&
(newPart.updateSequence() < part.updateSequence() ||
- (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+ (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
) {
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
AffinityAssignment assignment = cctx.affinity().assignment(topVer);
- GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+ GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+ req.futureId(),
+ cctx.cacheId(),
topVer,
assignment.assignment());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9fe29ef..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -345,7 +345,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
", locStart=" + cctx.startTopologyVersion() +
- ", cacheStart=" + cctx.cacheStartTopologyVersion() +
", locNode=" + cctx.localNode() +
", stopping=" + cctx.kernalContext().isStopping();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 6149586..4f0d9a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -80,7 +80,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
defaultCacheConfiguration(),
CacheType.USER,
AffinityTopologyVersion.ZERO,
- AffinityTopologyVersion.ZERO,
true,
true,
null,