You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/18 16:50:24 UTC
incubator-ignite git commit: # ignite-23 avoid exchange for client
node join/leave
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-23 [created] a2a6f31fe
# ignite-23 avoid exchange for client node join/leave
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a2a6f31f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a2a6f31f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a2a6f31f
Branch: refs/heads/ignite-23
Commit: a2a6f31fe4207dbb3cd2146c3f22a5c99d83fc5b
Parents: 5c8591c
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 18 17:50:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 18 17:50:06 2015 +0300
----------------------------------------------------------------------
.../affinity/GridAffinityAssignmentCache.java | 27 +++
.../cache/GridCacheAffinityManager.java | 12 ++
.../GridDhtPartitionsExchangeFuture.java | 100 +++++++---
.../IgniteCacheClientNodeExchangeTest.java | 184 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite2.java | 2 +
5 files changed, 300 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index eccd9f9..18ac65a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -103,6 +103,32 @@ public class GridAffinityAssignmentCache {
}
/**
+ * @param node Node.
+ * @param topVer Topology version.
+ */
+ public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
+ GridAffinityAssignment assignment = head.get();
+
+ assert assignment.primaryPartitions(node.id()).isEmpty() : node;
+ assert assignment.backupPartitions(node.id()).isEmpty() : node;
+
+ GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, assignment.assignment());
+
+ affCache.put(topVer, assignmentCpy);
+ head.set(assignmentCpy);
+
+ for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+ if (entry.getKey().compareTo(topVer) <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Completing topology ready future (use previous affinity) " +
+ "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
+
+ entry.getValue().onDone(topVer);
+ }
+ }
+ }
+
+ /**
* Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes
* and brought to local node on partition map exchange.
*
@@ -422,6 +448,7 @@ public class GridAffinityAssignmentCache {
/**
*
+ * @param reqTopVer Required topology version.
*/
private AffinityReadyFuture(AffinityTopologyVersion reqTopVer) {
this.reqTopVer = reqTopVer;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index fe7efd5..6541e9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -140,6 +140,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
*
* @param topVer Topology version to calculate affinity for.
* @param discoEvt Discovery event that causes this topology change.
+ * @return Affinity assignments.
*/
public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
assert !cctx.isLocal();
@@ -148,6 +149,17 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
}
/**
+ *
+ * @param node Node.
+ * @param topVer Topology version.
+ */
+ public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
+ assert !cctx.isLocal();
+
+ aff.clientNodeTopologyChange(node, topVer);
+ }
+
+ /**
* @return Partition count.
*/
public int partitions() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 599e391..f4dcf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,6 +44,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
+import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
@@ -146,6 +147,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Dynamic cache change requests. */
private Collection<DynamicCacheChangeRequest> reqs;
+ /** Cache validation results. */
private volatile Map<Integer, Boolean> cacheValidRes;
/**
@@ -200,6 +202,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param cctx Cache context.
* @param busyLock Busy lock.
* @param exchId Exchange ID.
+ * @param reqs Cache change requests.
*/
public GridDhtPartitionsExchangeFuture(
GridCacheSharedContext cctx,
@@ -322,7 +325,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * Rechecks topology.
+ * @param cacheCtx Cache context.
+ * @throws IgniteCheckedException If failed.
*/
private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException {
if (stopping(cacheCtx.cacheId()))
@@ -471,6 +475,62 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
// will return corresponding nodes.
U.await(evtLatch);
+ if (!dummy && !forcePreload && F.isEmpty(reqs)) { // If exchange initiated by node join or leave.
+ assert discoEvt != null;
+
+ int type = discoEvt.type();
+
+ assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
+
+ ClusterNode node = discoEvt.eventNode();
+
+ if (!node.isLocal()) {
+ boolean affNode = false;
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ if (CU.affinityNode(node, cacheCtx.config().getNodeFilter())) {
+ affNode = true;
+
+ break;
+ }
+ }
+
+ if (!affNode) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
+
+ GridDhtPartitionTopology top = cacheCtx.topology();
+
+ GridDhtPartitionMap parts = top.partitions(node.id());
+
+ assert parts == null || parts.size() == 0 : parts;
+
+ top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+ }
+
+ if (!exchId.isLeft()) {
+ rmtNodes = new ConcurrentLinkedQueue<>(F.asList(node));
+
+ rmtIds = F.asList(node.id());
+ }
+
+ ready.set(true);
+
+ initFut.onDone(true);
+
+ onDone(exchId.topologyVersion());
+
+ return;
+ }
+ }
+ }
+
startCaches();
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -484,8 +544,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
List<String> cachesWithoutNodes = null;
- for (String name : cctx.cache().cacheNames()) {
- if (exchId.isLeft()) {
+ if (exchId.isLeft()) {
+ for (String name : cctx.cache().cacheNames()) {
if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
if (cachesWithoutNodes == null)
cachesWithoutNodes = new ArrayList<>();
@@ -521,7 +581,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
if (cachesWithoutNodes != null) {
- StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: ");
+ StringBuilder sb =
+ new StringBuilder("All server nodes for the following caches have left the cluster: ");
for (int i = 0; i < cachesWithoutNodes.size(); i++) {
String cache = cachesWithoutNodes.get(i);
@@ -666,36 +727,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("Initialized future: " + this);
- if (canSkipExchange())
- onDone(exchId.topologyVersion());
+ // If this node is not oldest.
+ if (!oldestNode.get().id().equals(cctx.localNodeId()))
+ sendPartitions();
else {
- // If this node is not oldest.
- if (!oldestNode.get().id().equals(cctx.localNodeId()))
- sendPartitions();
- else {
- boolean allReceived = allReceived();
-
- if (allReceived && replied.compareAndSet(false, true)) {
- if (spreadPartitions())
- onDone(exchId.topologyVersion());
- }
- }
+ boolean allReceived = allReceived();
- scheduleRecheck();
+ if (allReceived && replied.compareAndSet(false, true)) {
+ if (spreadPartitions())
+ onDone(exchId.topologyVersion());
+ }
}
+
+ scheduleRecheck();
}
else
assert false : "Skipped init future: " + this;
}
/**
- * @return {@code True} if no distributed exchange is needed.
- */
- private boolean canSkipExchange() {
- return false; // TODO ignite-23;
- }
-
- /**
*
*/
private void dumpPendingObjects() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
new file mode 100644
index 0000000..66db3c6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.eclipse.jetty.util.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNodeExchangeTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoPartitionExchangeForClient() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+ Ignite ignite1 = startGrid(1);
+
+ TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi();
+
+ assertEquals(0, spi0.partitionsSingleMessages().size());
+ assertEquals(1, spi0.partitionsFullMessages().size());
+
+ assertEquals(1, spi1.partitionsSingleMessages().size());
+ assertEquals(0, spi1.partitionsFullMessages().size());
+
+ spi0.reset();
+ spi1.reset();
+
+ client = true;
+
+ for (int i = 0; i < 3; i++) {
+ log.info("Start client node: " + i);
+
+ Ignite ignite2 = startGrid(2);
+
+ TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+ assertEquals(0, spi0.partitionsSingleMessages().size());
+ assertEquals(1, spi0.partitionsFullMessages().size());
+
+ assertEquals(0, spi1.partitionsSingleMessages().size());
+ assertEquals(0, spi1.partitionsFullMessages().size());
+
+ assertEquals(1, spi2.partitionsSingleMessages().size());
+ assertEquals(0, spi2.partitionsFullMessages().size());
+
+ spi0.reset();
+ spi1.reset();
+ spi2.reset();
+
+ log.info("Stop client node.");
+
+ ignite2.close();
+
+ assertEquals(0, spi0.partitionsSingleMessages().size());
+ assertEquals(0, spi0.partitionsFullMessages().size());
+
+ assertEquals(0, spi1.partitionsSingleMessages().size());
+ assertEquals(0, spi1.partitionsFullMessages().size());
+ }
+ }
+
+ /**
+ * Test communication SPI.
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private ConcurrentHashSet<GridDhtPartitionsSingleMessage> partSingleMsgs = new ConcurrentHashSet<>();
+
+ /** */
+ private ConcurrentHashSet<GridDhtPartitionsFullMessage> partFullMsgs = new ConcurrentHashSet<>();
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) {
+ super.sendMessage(node, msg);
+
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridDhtPartitionsSingleMessage) {
+ if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) {
+ log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+ partSingleMsgs.add((GridDhtPartitionsSingleMessage) msg0);
+ }
+ }
+ else if (msg0 instanceof GridDhtPartitionsFullMessage) {
+ if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) {
+ log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+ partFullMsgs.add((GridDhtPartitionsFullMessage) msg0);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ void reset() {
+ partSingleMsgs.clear();
+ partFullMsgs.clear();
+ }
+
+ /**
+ * @return Sent partitions single messages.
+ */
+ Collection<GridDhtPartitionsSingleMessage> partitionsSingleMessages() {
+ return partSingleMsgs;
+ }
+
+ /**
+ * @return Sent partitions full messages.
+ */
+ Collection<GridDhtPartitionsFullMessage> partitionsFullMessages() {
+ return partFullMsgs;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 5738778..6031dcb 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -138,6 +138,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(GridCacheNearPrimarySyncSelfTest.class));
suite.addTest(new TestSuite(GridCacheColocatedPrimarySyncSelfTest.class));
+ suite.addTest(new TestSuite(IgniteCacheClientNodeExchangeTest.class));
+
return suite;
}
}