You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 14:46:35 UTC
[28/50] incubator-ignite git commit: # ignite-23
# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a837fe1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a837fe1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a837fe1e
Branch: refs/heads/ignite-929
Commit: a837fe1e358191fee15dc193090634b2e9a11b6a
Parents: 913b0ef
Author: sboikov <se...@inria.fr>
Authored: Mon May 25 21:38:58 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon May 25 21:38:58 2015 +0300
----------------------------------------------------------------------
.../affinity/GridAffinityAssignmentCache.java | 59 ++++++-----
.../cache/GridCacheAffinityManager.java | 9 +-
.../GridDhtPartitionsExchangeFuture.java | 17 ++-
...teCacheClientNodePartitionsExchangeTest.java | 105 +++++++++++++++----
4 files changed, 137 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 2992a6c..0969a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -32,6 +32,8 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+
/**
* Affinity cached function.
*/
@@ -103,34 +105,6 @@ public class GridAffinityAssignmentCache {
}
/**
- * Copies previous affinity assignment when client node joins on leaves.
- *
- * @param node Node.
- * @param topVer Topology version.
- */
- public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
- GridAffinityAssignment assignment = head.get();
-
- assert assignment.primaryPartitions(node.id()).isEmpty() : node;
- assert assignment.backupPartitions(node.id()).isEmpty() : node;
-
- GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, assignment.assignment());
-
- affCache.put(topVer, assignmentCpy);
- head.set(assignmentCpy);
-
- for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
- if (entry.getKey().compareTo(topVer) <= 0) {
- if (log.isDebugEnabled())
- log.debug("Completing topology ready future (use previous affinity) " +
- "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
-
- entry.getValue().onDone(topVer);
- }
- }
- }
-
- /**
* Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes
* and brought to local node on partition map exchange.
*
@@ -249,6 +223,35 @@ public class GridAffinityAssignmentCache {
}
/**
+ * Copies previous affinity assignment when discovery event does not cause affinity assignment changes
+ * (e.g. client node joins on leaves).
+ *
+ * @param evt Event.
+ * @param topVer Topology version.
+ */
+ public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
+ GridAffinityAssignment aff = head.get();
+
+ assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
+ assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
+
+ GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff.assignment());
+
+ affCache.put(topVer, assignmentCpy);
+ head.set(assignmentCpy);
+
+ for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+ if (entry.getKey().compareTo(topVer) <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Completing topology ready future (use previous affinity) " +
+ "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
+
+ entry.getValue().onDone(topVer);
+ }
+ }
+ }
+
+ /**
* @return Last calculated affinity version.
*/
public AffinityTopologyVersion lastVersion() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 20fca7e..ea17df1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -149,15 +149,16 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
}
/**
- * Copies previous affinity assignment when client node joins on leaves.
+ * Copies previous affinity assignment when discovery event does not cause affinity assignment changes
+ * (e.g. client node joins on leaves).
*
- * @param node Node.
+ * @param evt Event.
* @param topVer Topology version.
*/
- public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
+ public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
assert !cctx.isLocal();
- aff.clientNodeTopologyChange(node, topVer);
+ aff.clientEventTopologyChange(evt, topVer);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2ff445f..7963c56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -482,18 +482,29 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
else {
assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
- clientNodeEvt = false;
+ boolean clientOnlyStart = true;
+
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (!req.clientStartOnly()) {
+ clientOnlyStart = false;
+
+ break;
+ }
+ }
+
+ clientNodeEvt = clientOnlyStart;
}
if (clientNodeEvt) {
ClusterNode node = discoEvt.eventNode();
- if (!node.isLocal()) { // Client need to initialize affinity for local join event.
+ // Client need to initialize affinity for local join event or for stated client caches.
+ if (!node.isLocal()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
- cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
+ cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
GridDhtPartitionTopology top = cacheCtx.topology();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index 726ff22..162aa81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -373,23 +373,32 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
* @throws Exception If failed.
*/
private void waitForTopologyUpdate(int expNodes, int topVer) throws Exception {
+ final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0);
+
+ waitForTopologyUpdate(expNodes, ver);
+ }
+
+ /**
+ * @param expNodes Expected number of nodes.
+ * @param topVer Expected topology version.
+ * @throws Exception If failed.
+ */
+ private void waitForTopologyUpdate(int expNodes, final AffinityTopologyVersion topVer) throws Exception {
List<Ignite> nodes = G.allGrids();
assertEquals(expNodes, nodes.size());
- final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0);
-
for (Ignite ignite : nodes) {
final IgniteKernal kernal = (IgniteKernal)ignite;
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return ver.equals(kernal.context().cache().context().exchange().readyAffinityVersion());
+ return topVer.equals(kernal.context().cache().context().exchange().readyAffinityVersion());
}
}, 10_000);
assertEquals("Unexpected affinity version for " + ignite.name(),
- ver,
+ topVer,
kernal.context().cache().context().exchange().readyAffinityVersion());
}
@@ -417,7 +426,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
GridDhtPartitionTopology top = cache.context().topology();
assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']',
- ver,
+ topVer,
top.topologyVersion());
}
}
@@ -429,35 +438,52 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
* @throws Exception If failed.
*/
public void testClientOnlyCacheStart() throws Exception {
- clientOnlyCacheStart(false);
+ clientOnlyCacheStart(false, false);
}
/**
* @throws Exception If failed.
*/
public void testNearOnlyCacheStart() throws Exception {
- clientOnlyCacheStart(true);
+ clientOnlyCacheStart(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientOnlyCacheStartFromServerNode() throws Exception {
+ clientOnlyCacheStart(false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearOnlyCacheStartFromServerNode() throws Exception {
+ clientOnlyCacheStart(true, true);
}
/**
* @param nearCache If {@code true} creates near cache on client.
* @throws Exception If failed.
*/
- public void clientOnlyCacheStart(boolean nearCache) throws Exception {
+ private void clientOnlyCacheStart(boolean nearCache, boolean srvNode) throws Exception {
Ignite ignite0 = startGrid(0);
Ignite ignite1 = startGrid(1);
waitForTopologyUpdate(2, 2);
- final String CACHE_NAME = "cache1";
+ final String CACHE_NAME1 = "cache1";
CacheConfiguration ccfg = new CacheConfiguration();
- ccfg.setName(CACHE_NAME);
+ ccfg.setName(CACHE_NAME1);
+
+ if (srvNode)
+ ccfg.setNodeFilter(new TestFilter(getTestGridName(2)));
ignite0.createCache(ccfg);
- client = true;
+ client = !srvNode;
Ignite ignite2 = startGrid(2);
@@ -474,9 +500,11 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1"));
if (nearCache)
- ignite2.getOrCreateNearCache(CACHE_NAME, new NearCacheConfiguration<>());
+ ignite2.getOrCreateNearCache(CACHE_NAME1, new NearCacheConfiguration<>());
else
- ignite2.cache(CACHE_NAME);
+ ignite2.cache(CACHE_NAME1);
+
+ waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 1));
GridCacheAdapter cache = ((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1");
@@ -484,10 +512,10 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
assertEquals(nearCache, cache.context().isNear());
assertEquals(0, spi0.partitionsSingleMessages());
- assertEquals(0, spi0.partitionsFullMessages());
+ assertEquals(1, spi0.partitionsFullMessages());
assertEquals(0, spi1.partitionsSingleMessages());
assertEquals(0, spi1.partitionsFullMessages());
- assertEquals(0, spi2.partitionsSingleMessages());
+ assertEquals(1, spi2.partitionsSingleMessages());
assertEquals(0, spi2.partitionsFullMessages());
ClusterNode clientNode = ((IgniteKernal)ignite2).localNode();
@@ -495,9 +523,50 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
for (Ignite ignite : Ignition.allGrids()) {
GridDiscoveryManager disco = ((IgniteKernal)ignite).context().discovery();
- assertTrue(disco.cacheNode(clientNode, CACHE_NAME));
- assertFalse(disco.cacheAffinityNode(clientNode, CACHE_NAME));
- assertEquals(nearCache, disco.cacheNearNode(clientNode, CACHE_NAME));
+ assertTrue(disco.cacheNode(clientNode, CACHE_NAME1));
+ assertFalse(disco.cacheAffinityNode(clientNode, CACHE_NAME1));
+ assertEquals(nearCache, disco.cacheNearNode(clientNode, CACHE_NAME1));
+ }
+
+ spi0.reset();
+ spi1.reset();
+ spi2.reset();
+
+ final String CACHE_NAME2 = "cache2";
+
+ ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME2);
+
+ ignite2.createCache(ccfg);
+
+ waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 2));
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(2, spi0.partitionsFullMessages());
+ assertEquals(1, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+ assertEquals(1, spi2.partitionsSingleMessages());
+ assertEquals(0, spi2.partitionsFullMessages());
+ }
+
+ /**
+ *
+ */
+ private static class TestFilter implements IgnitePredicate<ClusterNode> {
+ /** */
+ private String exclNodeName;
+
+ /**
+ * @param exclNodeName Node name to exclude.
+ */
+ public TestFilter(String exclNodeName) {
+ this.exclNodeName = exclNodeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ return !exclNodeName.equals(clusterNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME));
}
}