You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 14:46:35 UTC

[28/50] incubator-ignite git commit: # ignite-23

# ignite-23


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a837fe1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a837fe1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a837fe1e

Branch: refs/heads/ignite-929
Commit: a837fe1e358191fee15dc193090634b2e9a11b6a
Parents: 913b0ef
Author: sboikov <se...@inria.fr>
Authored: Mon May 25 21:38:58 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon May 25 21:38:58 2015 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  59 ++++++-----
 .../cache/GridCacheAffinityManager.java         |   9 +-
 .../GridDhtPartitionsExchangeFuture.java        |  17 ++-
 ...teCacheClientNodePartitionsExchangeTest.java | 105 +++++++++++++++----
 4 files changed, 137 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 2992a6c..0969a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -32,6 +32,8 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+
 /**
  * Affinity cached function.
  */
@@ -103,34 +105,6 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
-     * Copies previous affinity assignment when client node joins on leaves.
-     *
-     * @param node Node.
-     * @param topVer Topology version.
-     */
-    public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
-        GridAffinityAssignment assignment = head.get();
-
-        assert assignment.primaryPartitions(node.id()).isEmpty() : node;
-        assert assignment.backupPartitions(node.id()).isEmpty() : node;
-
-        GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, assignment.assignment());
-
-        affCache.put(topVer, assignmentCpy);
-        head.set(assignmentCpy);
-
-        for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
-            if (entry.getKey().compareTo(topVer) <= 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Completing topology ready future (use previous affinity) " +
-                        "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
-
-                entry.getValue().onDone(topVer);
-            }
-        }
-    }
-
-    /**
      * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes
      * and brought to local node on partition map exchange.
      *
@@ -249,6 +223,35 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * Copies previous affinity assignment when discovery event does not cause affinity assignment changes
+     * (e.g. client node joins on leaves).
+     *
+     * @param evt Event.
+     * @param topVer Topology version.
+     */
+    public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
+        GridAffinityAssignment aff = head.get();
+
+        assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT  || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
+        assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT  || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
+
+        GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff.assignment());
+
+        affCache.put(topVer, assignmentCpy);
+        head.set(assignmentCpy);
+
+        for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+            if (entry.getKey().compareTo(topVer) <= 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Completing topology ready future (use previous affinity) " +
+                            "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
+
+                entry.getValue().onDone(topVer);
+            }
+        }
+    }
+
+    /**
      * @return Last calculated affinity version.
      */
     public AffinityTopologyVersion lastVersion() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 20fca7e..ea17df1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -149,15 +149,16 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * Copies previous affinity assignment when client node joins on leaves.
+     * Copies previous affinity assignment when discovery event does not cause affinity assignment changes
+     * (e.g. client node joins on leaves).
      *
-     * @param node Node.
+     * @param evt Event.
      * @param topVer Topology version.
      */
-    public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
+    public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
         assert !cctx.isLocal();
 
-        aff.clientNodeTopologyChange(node, topVer);
+        aff.clientEventTopologyChange(evt, topVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2ff445f..7963c56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -482,18 +482,29 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 else {
                     assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
 
-                    clientNodeEvt = false;
+                    boolean clientOnlyStart = true;
+
+                    for (DynamicCacheChangeRequest req : reqs) {
+                        if (!req.clientStartOnly()) {
+                            clientOnlyStart = false;
+
+                            break;
+                        }
+                    }
+
+                    clientNodeEvt = clientOnlyStart;
                 }
 
                 if (clientNodeEvt) {
                     ClusterNode node = discoEvt.eventNode();
 
-                    if (!node.isLocal()) {  // Client need to initialize affinity for local join event.
+                    // Client need to initialize affinity for local join event or for stated client caches.
+                    if (!node.isLocal()) {
                         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                             if (cacheCtx.isLocal())
                                 continue;
 
-                            cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
+                            cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
 
                             GridDhtPartitionTopology top = cacheCtx.topology();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index 726ff22..162aa81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -373,23 +373,32 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
      * @throws Exception If failed.
      */
     private void waitForTopologyUpdate(int expNodes, int topVer) throws Exception {
+        final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0);
+
+        waitForTopologyUpdate(expNodes, ver);
+    }
+
+    /**
+     * @param expNodes Expected number of nodes.
+     * @param topVer Expected topology version.
+     * @throws Exception If failed.
+     */
+    private void waitForTopologyUpdate(int expNodes, final AffinityTopologyVersion topVer) throws Exception {
         List<Ignite> nodes = G.allGrids();
 
         assertEquals(expNodes, nodes.size());
 
-        final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0);
-
         for (Ignite ignite : nodes) {
             final IgniteKernal kernal = (IgniteKernal)ignite;
 
             GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    return ver.equals(kernal.context().cache().context().exchange().readyAffinityVersion());
+                    return topVer.equals(kernal.context().cache().context().exchange().readyAffinityVersion());
                 }
             }, 10_000);
 
             assertEquals("Unexpected affinity version for " + ignite.name(),
-                ver,
+                topVer,
                 kernal.context().cache().context().exchange().readyAffinityVersion());
         }
 
@@ -417,7 +426,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
                 GridDhtPartitionTopology top = cache.context().topology();
 
                 assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']',
-                    ver,
+                    topVer,
                     top.topologyVersion());
             }
         }
@@ -429,35 +438,52 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
      * @throws Exception If failed.
      */
     public void testClientOnlyCacheStart() throws Exception {
-        clientOnlyCacheStart(false);
+        clientOnlyCacheStart(false, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNearOnlyCacheStart() throws Exception {
-        clientOnlyCacheStart(true);
+        clientOnlyCacheStart(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientOnlyCacheStartFromServerNode() throws Exception {
+        clientOnlyCacheStart(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearOnlyCacheStartFromServerNode() throws Exception {
+        clientOnlyCacheStart(true, true);
     }
 
     /**
      * @param nearCache If {@code true} creates near cache on client.
      * @throws Exception If failed.
      */
-    public void clientOnlyCacheStart(boolean nearCache) throws Exception {
+    private void clientOnlyCacheStart(boolean nearCache, boolean srvNode) throws Exception {
         Ignite ignite0 = startGrid(0);
         Ignite ignite1 = startGrid(1);
 
         waitForTopologyUpdate(2, 2);
 
-        final String CACHE_NAME = "cache1";
+        final String CACHE_NAME1 = "cache1";
 
         CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg.setName(CACHE_NAME);
+        ccfg.setName(CACHE_NAME1);
+
+        if (srvNode)
+            ccfg.setNodeFilter(new TestFilter(getTestGridName(2)));
 
         ignite0.createCache(ccfg);
 
-        client = true;
+        client = !srvNode;
 
         Ignite ignite2 = startGrid(2);
 
@@ -474,9 +500,11 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
         assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1"));
 
         if (nearCache)
-            ignite2.getOrCreateNearCache(CACHE_NAME, new NearCacheConfiguration<>());
+            ignite2.getOrCreateNearCache(CACHE_NAME1, new NearCacheConfiguration<>());
         else
-            ignite2.cache(CACHE_NAME);
+            ignite2.cache(CACHE_NAME1);
+
+        waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 1));
 
         GridCacheAdapter cache = ((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1");
 
@@ -484,10 +512,10 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
         assertEquals(nearCache, cache.context().isNear());
 
         assertEquals(0, spi0.partitionsSingleMessages());
-        assertEquals(0, spi0.partitionsFullMessages());
+        assertEquals(1, spi0.partitionsFullMessages());
         assertEquals(0, spi1.partitionsSingleMessages());
         assertEquals(0, spi1.partitionsFullMessages());
-        assertEquals(0, spi2.partitionsSingleMessages());
+        assertEquals(1, spi2.partitionsSingleMessages());
         assertEquals(0, spi2.partitionsFullMessages());
 
         ClusterNode clientNode = ((IgniteKernal)ignite2).localNode();
@@ -495,9 +523,50 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
         for (Ignite ignite : Ignition.allGrids()) {
             GridDiscoveryManager disco = ((IgniteKernal)ignite).context().discovery();
 
-            assertTrue(disco.cacheNode(clientNode, CACHE_NAME));
-            assertFalse(disco.cacheAffinityNode(clientNode, CACHE_NAME));
-            assertEquals(nearCache, disco.cacheNearNode(clientNode, CACHE_NAME));
+            assertTrue(disco.cacheNode(clientNode, CACHE_NAME1));
+            assertFalse(disco.cacheAffinityNode(clientNode, CACHE_NAME1));
+            assertEquals(nearCache, disco.cacheNearNode(clientNode, CACHE_NAME1));
+        }
+
+        spi0.reset();
+        spi1.reset();
+        spi2.reset();
+
+        final String CACHE_NAME2 = "cache2";
+
+        ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME2);
+
+        ignite2.createCache(ccfg);
+
+        waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 2));
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(2, spi0.partitionsFullMessages());
+        assertEquals(1, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+        assertEquals(1, spi2.partitionsSingleMessages());
+        assertEquals(0, spi2.partitionsFullMessages());
+    }
+
+    /**
+     *
+     */
+    private static class TestFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private String exclNodeName;
+
+        /**
+         * @param exclNodeName Node name to exclude.
+         */
+        public TestFilter(String exclNodeName) {
+            this.exclNodeName = exclNodeName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode clusterNode) {
+            return !exclNodeName.equals(clusterNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME));
         }
     }