You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/12/12 16:12:05 UTC

ignite git commit: IGNITE-7177 Correctly handle custom messages which do not change affinity

Repository: ignite
Updated Branches:
  refs/heads/master c10aa0c4b -> 7cf049e86


IGNITE-7177 Correctly handle custom messages which do not change affinity


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cf049e8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cf049e8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cf049e8

Branch: refs/heads/master
Commit: 7cf049e86297427054d07298b33cc6874aa3c1c0
Parents: c10aa0c
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Dec 12 19:11:46 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Dec 12 19:11:46 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 31 +++++++++++++++-----
 .../GridDhtPartitionsExchangeFuture.java        | 14 +++++++--
 2 files changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf049e8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 8441a5e..6347d03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -684,6 +684,28 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param fut Exchange future.
+     * @param crd Coordinator flag.
+     * @param exchActions Exchange actions.
+     */
+    public void onCustomMessageNoAffinityChange(
+        GridDhtPartitionsExchangeFuture fut,
+        boolean crd,
+        @Nullable final ExchangeActions exchActions
+    ) {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
+        forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+            @Override public void applyx(GridAffinityAssignmentCache aff) {
+                if (exchActions != null && exchActions.cacheGroupStopping(aff.groupId()))
+                    return;
+
+                aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
+            }
+        });
+    }
+
+    /**
      * Called on exchange initiated for cache start/stop request.
      *
      * @param fut Exchange future.
@@ -703,14 +725,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         caches.updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
-        forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
-            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                if (exchActions.cacheGroupStopping(aff.groupId()))
-                    return;
-
-                aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
-            }
-        });
+        onCustomMessageNoAffinityChange(fut, crd, exchActions);
 
         for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) {
             DynamicCacheDescriptor cacheDesc = action.descriptor();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf049e8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d29293e..a8b195d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -588,9 +588,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     exchange = onCacheChangeRequest(crdNode);
                 }
                 else if (msg instanceof SnapshotDiscoveryMessage) {
-                    exchange = CU.clientNode(firstDiscoEvt.eventNode()) ?
-                        onClientNodeEvent(crdNode) :
-                        onServerNodeEvent(crdNode);
+                    exchange = onCustomMessageNoAffinityChange(crdNode);
                 }
                 else {
                     assert affChangeMsg != null : this;
@@ -894,6 +892,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @param crd Coordinator flag.
+     * @return Exchange type.
+     */
+    private ExchangeType onCustomMessageNoAffinityChange(boolean crd) {
+        cctx.affinity().onCustomMessageNoAffinityChange(this, crd, exchActions);
+
+        return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+    }
+
+    /**
+     * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return Exchange type.
      */