You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/15 10:25:36 UTC
[07/14] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a024a5f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a024a5f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a024a5f
Branch: refs/heads/ignite-5075
Commit: 5a024a5f113d1a3edb427e67fbe0e68a9a5c3a1f
Parents: a9317a4
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 12 15:50:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 12 15:50:49 2017 +0300
----------------------------------------------------------------------
.../processors/cache/ClusterCachesInfo.java | 22 ++++-----
.../cache/DynamicCacheDescriptor.java | 47 +++++++++++++++++---
.../processors/cache/GridCacheIoManager.java | 18 +++++---
3 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index efcf6a8..da36470 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -337,15 +337,17 @@ class ClusterCachesInfo {
if (needExchange) {
req.clientStartOnly(true);
- desc.localStartVersion(topVer.nextMinorVersion());
+ desc.clientCacheStartVersion(topVer.nextMinorVersion());
exchangeActions.addClientCacheToStart(req, desc);
}
}
- if (!needExchange) {
- if (desc != null)
- waitTopVer = desc.localStartVersion();
+ if (!needExchange && desc != null) {
+ if (desc.clientCacheStartVersion() != null)
+ waitTopVer = desc.clientCacheStartVersion();
+ else
+ waitTopVer = desc.startTopologyVersion();
}
}
else if (req.globalStateChange())
@@ -397,7 +399,7 @@ class ClusterCachesInfo {
for (DynamicCacheDescriptor desc : addedDescs) {
assert desc.template() || incMinorTopVer;
- desc.localStartVersion(startTopVer);
+ desc.startTopologyVersion(startTopVer);
}
}
@@ -541,8 +543,6 @@ class ClusterCachesInfo {
processJoiningNode(joinDiscoData, node.id());
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- desc.localStartVersion(topVer);
-
CacheConfiguration cfg = desc.cacheConfiguration();
CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
@@ -559,7 +559,9 @@ class ClusterCachesInfo {
desc.deploymentId(),
desc.schema());
- desc0.localStartVersion(desc.localStartVersion());
+ desc0.startTopologyVersion(desc.startTopologyVersion());
+ desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
+ desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
desc0.receivedFrom(desc.receivedFrom());
desc0.staticallyConfigured(desc.staticallyConfigured());
@@ -574,12 +576,12 @@ class ClusterCachesInfo {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (node.id().equals(desc.receivedFrom()))
- desc.localStartVersion(topVer);
+ desc.receivedFromStartVersion(topVer);
}
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
if (node.id().equals(desc.receivedFrom()))
- desc.localStartVersion(topVer);
+ desc.receivedFromStartVersion(topVer);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 5c7060c..cec1828 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -73,7 +73,13 @@ public class DynamicCacheDescriptor {
private volatile CacheObjectContext objCtx;
/** */
- private volatile transient AffinityTopologyVersion locStartVer;
+ private AffinityTopologyVersion startTopVer;
+
+ /** */
+ private AffinityTopologyVersion rcvdFromVer;
+
+ /** */
+ private AffinityTopologyVersion clientCacheStartVer;
/** Mutex to control schema. */
private final Object schemaMux = new Object();
@@ -230,17 +236,46 @@ public class DynamicCacheDescriptor {
}
/**
+ * @return Topology version when node provided cache configuration was started.
+ */
+ @Nullable public AffinityTopologyVersion receivedFromStartVersion() {
+ return rcvdFromVer;
+ }
+
+ /**
+ * @param rcvdFromVer Topology version when node provided cache configuration was started.
+ */
+ public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
+ this.rcvdFromVer = rcvdFromVer;
+ }
+
+
+ /**
+ * @return Start topology version.
+ */
+ @Nullable public AffinityTopologyVersion startTopologyVersion() {
+ return startTopVer;
+ }
+
+ /**
+ * @param startTopVer Start topology version.
+ */
+ public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+ this.startTopVer = startTopVer;
+ }
+
+ /**
* @return Version when client cache on local node was started.
*/
- @Nullable AffinityTopologyVersion localStartVersion() {
- return locStartVer;
+ @Nullable AffinityTopologyVersion clientCacheStartVersion() {
+ return clientCacheStartVer;
}
/**
- * @param locStartVer Version when cache on local node was started.
+ * @param clientCacheStartVer Version when client cache on local node was started.
*/
- public void localStartVersion(AffinityTopologyVersion locStartVer) {
- this.locStartVer = locStartVer;
+ public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
+ this.clientCacheStartVer = clientCacheStartVer;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a8a4dcd..fdd29e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,22 +146,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
assert cacheMsg.topologyVersion() != null : cacheMsg;
- DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
+ AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
- AffinityTopologyVersion waitVer = cacheDesc != null ? cacheDesc.localStartVersion() : null;
+ DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
- if (waitVer == null)
- waitVer = new AffinityTopologyVersion(cctx.localNode().order());
+ if (cacheDesc != null) {
+ if (cacheDesc.startTopologyVersion() != null)
+ startTopVer = cacheDesc.startTopologyVersion();
+ else if (cacheDesc.receivedFromStartVersion() != null)
+ startTopVer = cacheDesc.receivedFromStartVersion();
+ }
// Need to wait for exchange to avoid race between cache start and affinity request.
- fut = cctx.exchange().affinityReadyFuture(waitVer);
+ fut = cctx.exchange().affinityReadyFuture(startTopVer);
if (fut != null && !fut.isDone()) {
if (log.isDebugEnabled()) {
log.debug("Wait for exchange before processing message [msg=" + msg +
", node=" + nodeId +
- ", waitVer=" + waitVer +
- ", cacheDesc=" + cctx.cache().cacheDescriptor(cacheMsg.cacheId()) + ']');
+ ", waitVer=" + startTopVer +
+ ", cacheDesc=" + cacheDesc + ']');
}
fut.listen(new CI1<IgniteInternalFuture<?>>() {