You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/01 10:37:48 UTC
[48/49] ignite git commit: 5578
5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b22719e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b22719e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b22719e
Branch: refs/heads/ignite-5578
Commit: 7b22719efa96541e2a85449560bff9896712be00
Parents: 8f6be3b
Author: sboikov <sb...@gridgain.com>
Authored: Tue Aug 1 12:29:06 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Aug 1 13:25:37 2017 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 2 +
.../cluster/GridClusterStateProcessor.java | 2 +-
.../CacheLateAffinityAssignmentTest.java | 326 +++++++++++++++++--
...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +-
4 files changed, 302 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1601f2b..0dcfffc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -214,9 +214,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
/** */
+ @GridToStringExclude
private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
/** */
+ @GridToStringExclude
private int awaitMergedMsgs;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 6e94669..009f166 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -498,7 +498,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
", topVer=" + topVer +
", client=" + ctx.clientNode() +
- ", daemon" + ctx.isDaemon() + "]");
+ ", daemon=" + ctx.isDaemon() + "]");
}
IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 4bb7554..16cb625 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -70,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
@@ -587,20 +589,41 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
*
* @throws Exception If failed.
*/
- public void testAffinitySimpleNodeLeave() throws Exception {
- startServer(0, 1);
+ public void testAffinitySimpleNodeLeave1() throws Exception {
+ affinitySimpleNodeLeave(2);
+ }
- startServer(1, 2);
+ /**
+ * Simple test, node leaves.
+ *
+ * @throws Exception If failed.
+ */
+ public void testAffinitySimpleNodeLeave2() throws Exception {
+ affinitySimpleNodeLeave(4);
+ }
- checkAffinity(2, topVer(2, 0), false);
+ /**
+ * @param cnt Count of server nodes.
+ * @throws Exception If failed.
+ */
+ private void affinitySimpleNodeLeave(int cnt) throws Exception {
+ int topVer = 1;
- checkAffinity(2, topVer(2, 1), true);
+ startServer(topVer - 1, topVer++);
- stopNode(1, 3);
+ for (int i = 0; i < cnt - 1; i++, topVer++) {
+ startServer(topVer - 1, topVer);
- checkAffinity(1, topVer(3, 0), true);
+ checkAffinity(topVer, topVer(topVer, 0), false);
- checkNoExchange(1, topVer(3, 1));
+ checkAffinity(topVer, topVer(topVer, 1), true);
+ }
+
+ stopNode(1, topVer);
+
+ checkAffinity(cnt - 1, topVer(topVer, 0), true);
+
+ checkNoExchange(cnt - 1, topVer(topVer, 1));
awaitPartitionMapExchange();
}
@@ -1037,6 +1060,222 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg1() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 2);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg2() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg3() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 1);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg4() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg5() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 1);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg6() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg7() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2, 4);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg8() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(6, 3, false, 2, 4);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsg9() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, false, 4);
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testBlockedFinishMsgForClient() throws Exception {
+ doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, true, 4);
+ }
+
+ /**
+ * Coordinator leaves without sending all {@link GridDhtPartitionsFullMessage} messages,
+ * exchange must be completed.
+ *
+ * @param cnt Number of nodes.
+ * @param stopId Node to stop.
+ * @param lastClient {@code True} if last started node is client.
+ * @param blockedIds Nodes not receiving exchange finish message.
+ * @throws Exception If failed.
+ */
+ private void doTestCoordLeaveBlockedFinishExchangeMessage(int cnt,
+ int stopId,
+ boolean lastClient,
+ int... blockedIds) throws Exception
+ {
+ int ord = 1;
+
+ for (int i = 0; i < cnt; i++) {
+ if (i == cnt - 1 && lastClient)
+ startClient(ord - 1, ord++);
+ else
+ startServer(ord - 1, ord++);
+ }
+
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(grid(0));
+
+ final Set<String> blocked = new HashSet<>();
+
+ for (int id : blockedIds) {
+ String name = grid(id).name();
+
+ blocked.add(name);
+ }
+
+ spi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return blocked.contains(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME))
+ && (msg instanceof GridDhtPartitionsFullMessage)
+ && (((GridDhtPartitionsFullMessage)msg).exchangeId() != null);
+ }
+ });
+
+ checkAffinity(cnt, topVer(ord - 1, 1), true);
+
+ stopNode(stopId, ord);
+
+ AffinityTopologyVersion topVer = topVer(ord, 0);
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>(cnt);
+
+ List<Ignite> grids = G.allGrids();
+
+ for (Ignite ignite : grids)
+ futs.add(affinityReadyFuture(topVer, ignite));
+
+ assertEquals(futs.size(), grids.size());
+
+ for (int i = 0; i < futs.size(); i++) {
+ final IgniteInternalFuture<?> fut = futs.get(i);
+
+ Ignite ignite = grids.get(i);
+
+ if (!blocked.contains(ignite.name())) {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return fut.isDone();
+ }
+ }, 5000);
+
+ assertTrue(ignite.name(), fut.isDone());
+ }
+ else
+ assertFalse(ignite.name(), fut.isDone());
+ }
+
+ ord++;
+
+ stopNode(0, ord); // Triggers exchange completion from new coordinator.
+
+ checkAffinity(cnt - 2, topVer(ord - 1, 0), true, false);
+
+ checkAffinity(cnt - 2, topVer(ord, 0), true);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Assignment is delayed, coordinator leaves, nodes must complete exchange with same assignments.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCoordinatorLeaveAfterNodeLeavesDelayAssignment() throws Exception {
+ Ignite ignite0 = startServer(0, 1);
+
+ startServer(1, 2);
+
+ Ignite ignite2 = startServer(2, 3);
+
+ Ignite ignite3 = startServer(3, 4);
+
+ TestRecordingCommunicationSpi spi0 =
+ (TestRecordingCommunicationSpi) ignite0.configuration().getCommunicationSpi(), spi2, spi3;
+
+ // Prevent exchange completion.
+ spi0.blockMessages(GridDhtPartitionsFullMessage.class, ignite2.name());
+
+ // Block rebalance.
+ blockSupplySend(spi0, CACHE_NAME1);
+ blockSupplySend((spi2 = TestRecordingCommunicationSpi.spi(ignite2)), CACHE_NAME1);
+ blockSupplySend((spi3 = TestRecordingCommunicationSpi.spi(ignite3)), CACHE_NAME1);
+
+ stopNode(1, 5);
+
+ AffinityTopologyVersion topVer = topVer(5, 0);
+
+ IgniteInternalFuture<?> fut0 = affinityReadyFuture(topVer, ignite0);
+ IgniteInternalFuture<?> fut2 = affinityReadyFuture(topVer, ignite2);
+ IgniteInternalFuture<?> fut3 = affinityReadyFuture(topVer, ignite3);
+
+ U.sleep(1_000);
+
+ assertTrue(fut0.isDone());
+ assertFalse(fut2.isDone());
+ assertTrue(fut3.isDone());
+
+ // Finish rebalance on ignite3.
+ spi2.stopBlock(true);
+
+ stopNode(0, 6);
+
+ spi3.stopBlock(true);
+
+ checkAffinity(2, topVer, false);
+ }
+
+ /**
* Coordinator leaves during node leave exchange.
*
* @throws Exception If failed.
@@ -2181,6 +2420,18 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
/**
+ * @param topVer Topology version.
+ * @param node Node.
+ * @return Exchange future.
+ */
+ private IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion topVer, Ignite node) {
+ IgniteInternalFuture<?> fut = ((IgniteKernal)node).context().cache().context().exchange().
+ affinityReadyFuture(topVer);
+
+ return fut != null ? fut : new GridFinishedFuture<>();
+ }
+
+ /**
* @param major Major version.
* @param minor Minor version.
* @return Topology version.
@@ -2296,10 +2547,25 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
* @throws Exception If failed.
* @return Affinity assignments.
*/
- @SuppressWarnings("unchecked")
private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes,
AffinityTopologyVersion topVer,
boolean expIdeal) throws Exception {
+ return checkAffinity(expNodes, topVer, expIdeal, true);
+ }
+
+ /**
+ * @param expNodes Expected nodes number.
+ * @param topVer Topology version.
+ * @param expIdeal If {@code true} expect ideal affinity assignment.
+ * @param checkPublicApi {@code True} to check {@link Affinity} API.
+ * @throws Exception If failed.
+ * @return Affinity assignments.
+ */
+ @SuppressWarnings("unchecked")
+ private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes,
+ AffinityTopologyVersion topVer,
+ boolean expIdeal,
+ boolean checkPublicApi) throws Exception {
List<Ignite> nodes = G.allGrids();
Map<String, List<List<ClusterNode>>> aff = new HashMap<>();
@@ -2331,35 +2597,37 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
assertAffinity(ideal, aff2, node, cctx.name(), topVer);
- Affinity<Object> cacheAff = node.affinity(cctx.name());
+ if (checkPublicApi) {
+ Affinity<Object> cacheAff = node.affinity(cctx.name());
- for (int i = 0; i < 10; i++) {
- int part = cacheAff.partition(i);
+ for (int i = 0; i < 10; i++) {
+ int part = cacheAff.partition(i);
- List<ClusterNode> partNodes = ideal.get(part);
+ List<ClusterNode> partNodes = ideal.get(part);
- if (partNodes.isEmpty()) {
- try {
- cacheAff.mapKeyToNode(i);
+ if (partNodes.isEmpty()) {
+ try {
+ cacheAff.mapKeyToNode(i);
- fail();
+ fail();
+ }
+ catch (IgniteException ignore) {
+ // No-op.
+ }
}
- catch (IgniteException ignore) {
- // No-op.
+ else {
+ ClusterNode primary = cacheAff.mapKeyToNode(i);
+
+ assertEquals(primary, partNodes.get(0));
}
}
- else {
- ClusterNode primary = cacheAff.mapKeyToNode(i);
- assertEquals(primary, partNodes.get(0));
- }
- }
+ for (int p = 0; p < ideal.size(); p++) {
+ List<ClusterNode> exp = ideal.get(p);
+ Collection<ClusterNode> partNodes = cacheAff.mapPartitionToPrimaryAndBackups(p);
- for (int p = 0; p < ideal.size(); p++) {
- List<ClusterNode> exp = ideal.get(p);
- Collection<ClusterNode> partNodes = cacheAff.mapPartitionToPrimaryAndBackups(p);
-
- assertEqualsCollections(exp, partNodes);
+ assertEqualsCollections(exp, partNodes);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index cf898c5..0d0cda4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -362,11 +362,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
- log.info("Put key1: " + key1);
+ log.info("Put key1 [key1=" + key1 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key1)) + ']');
cache0.put(key1, key1);
- log.info("Put key2: " + key2);
+ log.info("Put key2 [key2=" + key2 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key2)) + ']');
cache0.put(key2, key2);