You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/15 10:25:36 UTC

[07/14] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a024a5f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a024a5f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a024a5f

Branch: refs/heads/ignite-5075
Commit: 5a024a5f113d1a3edb427e67fbe0e68a9a5c3a1f
Parents: a9317a4
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 12 15:50:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 12 15:50:49 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/ClusterCachesInfo.java     | 22 ++++-----
 .../cache/DynamicCacheDescriptor.java           | 47 +++++++++++++++++---
 .../processors/cache/GridCacheIoManager.java    | 18 +++++---
 3 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index efcf6a8..da36470 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -337,15 +337,17 @@ class ClusterCachesInfo {
                     if (needExchange) {
                         req.clientStartOnly(true);
 
-                        desc.localStartVersion(topVer.nextMinorVersion());
+                        desc.clientCacheStartVersion(topVer.nextMinorVersion());
 
                         exchangeActions.addClientCacheToStart(req, desc);
                     }
                 }
 
-                if (!needExchange) {
-                    if (desc != null)
-                        waitTopVer = desc.localStartVersion();
+                if (!needExchange && desc != null) {
+                    if (desc.clientCacheStartVersion() != null)
+                        waitTopVer = desc.clientCacheStartVersion();
+                    else
+                        waitTopVer = desc.startTopologyVersion();
                 }
             }
             else if (req.globalStateChange())
@@ -397,7 +399,7 @@ class ClusterCachesInfo {
             for (DynamicCacheDescriptor desc : addedDescs) {
                 assert desc.template() || incMinorTopVer;
 
-                desc.localStartVersion(startTopVer);
+                desc.startTopologyVersion(startTopVer);
             }
         }
 
@@ -541,8 +543,6 @@ class ClusterCachesInfo {
                     processJoiningNode(joinDiscoData, node.id());
 
                     for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                        desc.localStartVersion(topVer);
-
                         CacheConfiguration cfg = desc.cacheConfiguration();
 
                         CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
@@ -559,7 +559,9 @@ class ClusterCachesInfo {
                                 desc.deploymentId(),
                                 desc.schema());
 
-                            desc0.localStartVersion(desc.localStartVersion());
+                            desc0.startTopologyVersion(desc.startTopologyVersion());
+                            desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
+                            desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
                             desc0.receivedFrom(desc.receivedFrom());
                             desc0.staticallyConfigured(desc.staticallyConfigured());
 
@@ -574,12 +576,12 @@ class ClusterCachesInfo {
 
             for (DynamicCacheDescriptor desc : registeredCaches.values()) {
                 if (node.id().equals(desc.receivedFrom()))
-                    desc.localStartVersion(topVer);
+                    desc.receivedFromStartVersion(topVer);
             }
 
             for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
                 if (node.id().equals(desc.receivedFrom()))
-                    desc.localStartVersion(topVer);
+                    desc.receivedFromStartVersion(topVer);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 5c7060c..cec1828 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -73,7 +73,13 @@ public class DynamicCacheDescriptor {
     private volatile CacheObjectContext objCtx;
 
     /** */
-    private volatile transient AffinityTopologyVersion locStartVer;
+    private AffinityTopologyVersion startTopVer;
+
+    /** */
+    private AffinityTopologyVersion rcvdFromVer;
+
+    /** */
+    private AffinityTopologyVersion clientCacheStartVer;
 
     /** Mutex to control schema. */
     private final Object schemaMux = new Object();
@@ -230,17 +236,46 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * @return Topology version when node provided cache configuration was started.
+     */
+    @Nullable public AffinityTopologyVersion receivedFromStartVersion() {
+        return rcvdFromVer;
+    }
+
+    /**
+     * @param rcvdFromVer Topology version when node provided cache configuration was started.
+     */
+    public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
+        this.rcvdFromVer = rcvdFromVer;
+    }
+
+
+    /**
+     * @return Start topology version.
+     */
+    @Nullable public AffinityTopologyVersion startTopologyVersion() {
+        return startTopVer;
+    }
+
+    /**
+     * @param startTopVer Start topology version.
+     */
+    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+        this.startTopVer = startTopVer;
+    }
+
+    /**
      * @return Version when client cache on local node was started.
      */
-    @Nullable AffinityTopologyVersion localStartVersion() {
-        return locStartVer;
+    @Nullable AffinityTopologyVersion clientCacheStartVersion() {
+        return clientCacheStartVer;
     }
 
     /**
-     * @param locStartVer Version when cache on local node was started.
+     * @param clientCacheStartVer Version when client cache on local node was started.
      */
-    public void localStartVersion(AffinityTopologyVersion locStartVer) {
-        this.locStartVer = locStartVer;
+    public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
+        this.clientCacheStartVer = clientCacheStartVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a024a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a8a4dcd..fdd29e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,22 +146,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
                     assert cacheMsg.topologyVersion() != null : cacheMsg;
 
-                    DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
+                    AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
 
-                    AffinityTopologyVersion waitVer = cacheDesc != null ? cacheDesc.localStartVersion() : null;
+                    DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
 
-                    if (waitVer == null)
-                        waitVer = new AffinityTopologyVersion(cctx.localNode().order());
+                    if (cacheDesc != null) {
+                        if (cacheDesc.startTopologyVersion() != null)
+                            startTopVer = cacheDesc.startTopologyVersion();
+                        else if (cacheDesc.receivedFromStartVersion() != null)
+                            startTopVer = cacheDesc.receivedFromStartVersion();
+                    }
 
                     // Need to wait for exchange to avoid race between cache start and affinity request.
-                    fut = cctx.exchange().affinityReadyFuture(waitVer);
+                    fut = cctx.exchange().affinityReadyFuture(startTopVer);
 
                     if (fut != null && !fut.isDone()) {
                         if (log.isDebugEnabled()) {
                             log.debug("Wait for exchange before processing message [msg=" + msg +
                                 ", node=" + nodeId +
-                                ", waitVer=" + waitVer +
-                                ", cacheDesc=" + cctx.cache().cacheDescriptor(cacheMsg.cacheId()) + ']');
+                                ", waitVer=" + startTopVer +
+                                ", cacheDesc=" + cacheDesc + ']');
                         }
 
                         fut.listen(new CI1<IgniteInternalFuture<?>>() {