You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/04 08:19:41 UTC

[18/28] ignite git commit: GG-12330 Do not destroy single cache if cache group will be destroyed

GG-12330 Do not destroy single cache if cache group will be destroyed


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18bbb14e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18bbb14e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18bbb14e

Branch: refs/heads/ignite-2.1.2-exchange
Commit: 18bbb14e35eecaddb86ec4361f18a652b9559836
Parents: c697d45
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Fri Jun 30 21:22:28 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 30 21:22:28 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  26 ++---
 .../processors/cache/ClusterCachesInfo.java     |  13 ++-
 .../cache/DynamicCacheChangeRequest.java        |   1 +
 .../processors/cache/ExchangeActions.java       | 109 +++++++++++++++----
 .../processors/cache/GridCacheProcessor.java    |  58 +++++-----
 5 files changed, 141 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 5984ef5..9516f84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -772,10 +772,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         for (ExchangeActions.ActionData action : exchActions.cacheStopRequests())
             cctx.cache().blockGateway(action.request().cacheName(), true, action.request().restart());
 
-        for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) {
-            cctx.exchange().clearClientTopology(grpDesc.groupId());
+        for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
+            cctx.exchange().clearClientTopology(action.descriptor().groupId());
 
-            CacheGroupContext gctx = cctx.cache().cacheGroup(grpDesc.groupId());
+            CacheGroupContext gctx = cctx.cache().cacheGroup(action.descriptor().groupId());
 
             if (gctx != null) {
                 IgniteCheckedException ex;
@@ -797,11 +797,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         Set<Integer> stoppedGrps = null;
 
         if (crd && lateAffAssign) {
-            for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) {
-                if (grpDesc.config().getCacheMode() != LOCAL) {
-                    CacheGroupHolder cacheGrp = grpHolders.remove(grpDesc.groupId());
+            for (ExchangeActions.CacheGroupActionData data : exchActions.cacheGroupsToStop()) {
+                if (data.descriptor().config().getCacheMode() != LOCAL) {
+                    CacheGroupHolder cacheGrp = grpHolders.remove(data.descriptor().groupId());
 
-                    assert cacheGrp != null : grpDesc;
+                    assert cacheGrp != null : data.descriptor();
 
                     if (stoppedGrps == null)
                         stoppedGrps = new HashSet<>();
@@ -2270,7 +2270,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 if (!registeredGrps.containsKey(grpDesc.groupId()))
                     registeredGrps.put(grpDesc.groupId(), grpDesc);
 
-                if (!registeredCaches.containsKey(desc.cacheName()))
+                if (!registeredCaches.containsKey(desc.cacheId()))
                     registeredCaches.put(desc.cacheId(), desc);
             }
         }
@@ -2279,14 +2279,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param exchActions Exchange actions.
          */
         void updateCachesInfo(ExchangeActions exchActions) {
-            for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
-                CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+            for (ExchangeActions.CacheGroupActionData stopAction : exchActions.cacheGroupsToStop()) {
+                CacheGroupDescriptor rmvd = registeredGrps.remove(stopAction.descriptor().groupId());
 
-                assert rmvd != null : stopDesc.cacheOrGroupName();
+                assert rmvd != null : stopAction.descriptor().cacheOrGroupName();
             }
 
-            for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
-                CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+            for (ExchangeActions.CacheGroupActionData startAction : exchActions.cacheGroupsToStart()) {
+                CacheGroupDescriptor old = registeredGrps.put(startAction.descriptor().groupId(), startAction.descriptor());
 
                 assert old == null : old;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 15345d9..4e9dcf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -539,7 +539,18 @@ class ClusterCachesInfo {
 
                         ctx.discovery().removeCacheGroup(grpDesc);
 
-                        exchangeActions.addCacheGroupToStop(grpDesc);
+                        exchangeActions.addCacheGroupToStop(grpDesc, req.destroy());
+
+                        assert exchangeActions.checkStopRequestConsistency(grpDesc.groupId());
+
+                        // If all caches in group will be destroyed it is not necessary to destroy single cache
+                        // because group will be stopped anyway.
+                        if (req.destroy()) {
+                            for (ExchangeActions.ActionData action : exchangeActions.cacheStopRequests()) {
+                                if (action.descriptor().groupId() == grpDesc.groupId())
+                                    action.request().destroy(false);
+                            }
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 2c9e7f2..6d5eaf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -427,6 +427,7 @@ public class DynamicCacheChangeRequest implements Serializable {
             ", nodeId=" + initiatingNodeId +
             ", clientStartOnly=" + clientStartOnly +
             ", stop=" + stop +
+            ", destroy=" + destroy +
             ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 1a6b068..9caf9aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -35,10 +35,10 @@ import org.jetbrains.annotations.Nullable;
  */
 public class ExchangeActions {
     /** */
-    private List<CacheGroupDescriptor> cacheGrpsToStart;
+    private List<CacheGroupActionData> cacheGrpsToStart;
 
     /** */
-    private List<CacheGroupDescriptor> cacheGrpsToStop;
+    private List<CacheGroupActionData> cacheGrpsToStop;
 
     /** */
     private Map<String, ActionData> cachesToStart;
@@ -53,6 +53,29 @@ public class ExchangeActions {
     private ClusterState newState;
 
     /**
+     * @param grpId Group ID.
+     * @return Always {@code true}, fails with assert error if inconsistent.
+     */
+    boolean checkStopRequestConsistency(int grpId) {
+        Boolean destroy = null;
+
+        // Check that caches associated with that group will be all stopped only or all destroyed.
+        for (ExchangeActions.ActionData action : cacheStopRequests()) {
+            if (action.descriptor().groupId() == grpId) {
+                if (destroy == null)
+                    destroy = action.request().destroy();
+                else {
+                    assert action.request().destroy() == destroy
+                        : "Both cache stop only and cache destroy request associated with one group in batch "
+                        + cacheStopRequests();
+                }
+            }
+        }
+
+        return true;
+    }
+
+    /**
      * @return {@code True} if server nodes should not participate in exchange.
      */
     public boolean clientOnlyExchange() {
@@ -237,14 +260,14 @@ public class ExchangeActions {
         if (cacheGrpsToStart == null)
             cacheGrpsToStart = new ArrayList<>();
 
-        cacheGrpsToStart.add(grpDesc);
+        cacheGrpsToStart.add(new CacheGroupActionData(grpDesc));
     }
 
     /**
      * @return Cache groups to start.
      */
-    public List<CacheGroupDescriptor> cacheGroupsToStart() {
-        return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupDescriptor>emptyList();
+    public List<CacheGroupActionData> cacheGroupsToStart() {
+        return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupActionData>emptyList();
     }
 
     /**
@@ -253,8 +276,8 @@ public class ExchangeActions {
      */
     public boolean cacheGroupStarting(int grpId) {
         if (cacheGrpsToStart != null) {
-            for (CacheGroupDescriptor grp : cacheGrpsToStart) {
-                if (grp.groupId() == grpId)
+            for (CacheGroupActionData grp : cacheGrpsToStart) {
+                if (grp.desc.groupId() == grpId)
                     return true;
             }
         }
@@ -264,21 +287,22 @@ public class ExchangeActions {
 
     /**
      * @param grpDesc Group descriptor.
+     * @param destroy Destroy flag.
      */
-    public void addCacheGroupToStop(CacheGroupDescriptor grpDesc) {
+    public void addCacheGroupToStop(CacheGroupDescriptor grpDesc, boolean destroy) {
         assert grpDesc != null;
 
         if (cacheGrpsToStop == null)
             cacheGrpsToStop = new ArrayList<>();
 
-        cacheGrpsToStop.add(grpDesc);
+        cacheGrpsToStop.add(new CacheGroupActionData(grpDesc, destroy));
     }
 
     /**
      * @return Cache groups to start.
      */
-    public List<CacheGroupDescriptor> cacheGroupsToStop() {
-        return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupDescriptor>emptyList();
+    public List<CacheGroupActionData> cacheGroupsToStop() {
+        return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupActionData>emptyList();
     }
 
     /**
@@ -287,8 +311,8 @@ public class ExchangeActions {
      */
     public boolean cacheGroupStopping(int grpId) {
         if (cacheGrpsToStop != null) {
-            for (CacheGroupDescriptor grp : cacheGrpsToStop) {
-                if (grp.groupId() == grpId)
+            for (CacheGroupActionData grp : cacheGrpsToStop) {
+                if (grp.desc.groupId() == grpId)
                     return true;
             }
         }
@@ -312,10 +336,10 @@ public class ExchangeActions {
      */
     static class ActionData {
         /** */
-        private DynamicCacheChangeRequest req;
+        private final DynamicCacheChangeRequest req;
 
         /** */
-        private DynamicCacheDescriptor desc;
+        private final DynamicCacheDescriptor desc;
 
         /**
          * @param req Request.
@@ -344,16 +368,59 @@ public class ExchangeActions {
         }
     }
 
+    /**
+     *
+     */
+    static class CacheGroupActionData {
+        /** */
+        private final CacheGroupDescriptor desc;
+
+        /** */
+        private final boolean destroy;
+
+        /**
+         * @param desc Group descriptor
+         * @param destroy Destroy flag
+         */
+        CacheGroupActionData(CacheGroupDescriptor desc, boolean destroy) {
+            assert desc != null;
+
+            this.desc = desc;
+            this.destroy = destroy;
+        }
+
+        /**
+         * @param desc Group descriptor
+         */
+        CacheGroupActionData(CacheGroupDescriptor desc) {
+            this(desc, false);
+        }
+
+        /**
+         * @return Group descriptor
+         */
+        public CacheGroupDescriptor descriptor() {
+            return desc;
+        }
+
+        /**
+         * @return Destroy flag
+         */
+        public boolean destroy() {
+            return destroy;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
-        Object startGrps = F.viewReadOnly(cacheGrpsToStart, new C1<CacheGroupDescriptor, String>() {
-            @Override public String apply(CacheGroupDescriptor desc) {
-                return desc.cacheOrGroupName();
+        Object startGrps = F.viewReadOnly(cacheGrpsToStart, new C1<CacheGroupActionData, String>() {
+            @Override public String apply(CacheGroupActionData data) {
+                return data.desc.cacheOrGroupName();
             }
         });
-        Object stopGrps = F.viewReadOnly(cacheGrpsToStop, new C1<CacheGroupDescriptor, String>() {
-            @Override public String apply(CacheGroupDescriptor desc) {
-                return desc.cacheOrGroupName();
+        Object stopGrps = F.viewReadOnly(cacheGrpsToStop, new C1<CacheGroupActionData, String>() {
+            @Override public String apply(CacheGroupActionData data) {
+                return data.desc.cacheOrGroupName() + ", destroy=" + data.destroy;
             }
         });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c425bfb..bb6d068 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2110,40 +2110,34 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public void onExchangeDone(
         AffinityTopologyVersion topVer,
         @Nullable ExchangeActions exchActions,
-        Throwable err
+        @Nullable Throwable err
     ) {
         initCacheProxies(topVer, err);
 
-        if (exchActions != null && exchActions.systemCachesStarting() && exchActions.newClusterState() == null)
-            ctx.dataStructures().restoreStructuresState(ctx);
-
-        if (exchActions != null && err == null) {
-            Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps = null;
-
-            boolean forceCheckpoint = false;
-
-            for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
-                GridCacheContext<?, ?> stopCtx;
-                boolean destroy;
+        if (exchActions == null)
+            return;
 
-                if (!forceCheckpoint){
-                    try {
-                        sharedCtx.database().waitForCheckpoint("caches stop");
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
-                    }
+        if (exchActions.systemCachesStarting() && exchActions.newClusterState() == null)
+            ctx.dataStructures().restoreStructuresState(ctx);
 
-                    forceCheckpoint = true;
+        if (err == null) {
+            // Force checkpoint if there is any cache stop request
+            if (exchActions.cacheStopRequests().size() > 0) {
+                try {
+                    sharedCtx.database().waitForCheckpoint("caches stop");
                 }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
+                }
+            }
 
+            for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
                 stopGateway(action.request());
 
                 sharedCtx.database().checkpointReadLock();
 
                 try {
-                    stopCtx = prepareCacheStop(action.request().cacheName(), action.request().destroy());
-                    destroy = action.request().destroy();
+                    prepareCacheStop(action.request().cacheName(), action.request().destroy());
 
                     if (exchActions.newClusterState() == null)
                         ctx.state().onCacheStop(action.request());
@@ -2151,20 +2145,22 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 finally {
                     sharedCtx.database().checkpointReadUnlock();
                 }
+            }
+
+            List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new ArrayList<>();
 
-                if (stopCtx != null && !stopCtx.group().hasCaches()) {
-                    if (stoppedGrps == null)
-                        stoppedGrps = new ArrayList<>();
+            for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
+                Integer groupId = action.descriptor().groupId();
 
-                    stoppedGrps.add(F.t(stopCtx.group(), destroy));
+                if (cacheGrps.containsKey(groupId)) {
+                    stoppedGroups.add(F.t(cacheGrps.get(groupId), action.destroy()));
+
+                    stopCacheGroup(groupId);
                 }
             }
 
-            for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop())
-                stopCacheGroup(grpDesc.groupId());
-
-            if (stoppedGrps != null && !sharedCtx.kernalContext().clientNode())
-                sharedCtx.database().onCacheGroupsStopped(stoppedGrps);
+            if (!sharedCtx.kernalContext().clientNode())
+                sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
         }
     }