You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/02/06 11:56:41 UTC
[ignite] branch master updated: IGNITE-11204 Fixed exchange merge
hang when merged node fails before sending single message - Fixes #6028.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 65b18b7 IGNITE-11204 Fixed exchange merge hang when merged node fails before sending single message - Fixes #6028.
65b18b7 is described below
commit 65b18b7ba920b069251da2ec2002a530ceee1404
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Wed Feb 6 14:51:12 2019 +0300
IGNITE-11204 Fixed exchange merge hang when merged node fails before sending single message - Fixes #6028.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../cache/GridCachePartitionExchangeManager.java | 9 +
.../preloader/GridDhtPartitionsExchangeFuture.java | 15 +-
.../ExchangeMergeStaleServerNodesTest.java | 188 +++++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite6.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite6.java | 2 +
5 files changed, 213 insertions(+), 3 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 71a704c..08762a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2192,6 +2192,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * For testing only.
+ *
+ * @return Current version to wait for.
+ */
+ public AffinityTopologyVersion mergeExchangesTestWaitVersion() {
+ return exchMergeTestWaitVer;
+ }
+
+ /**
* @param curFut Current exchange future.
* @param msg Message.
* @return {@code True} if node is stopping.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f4043ab..170d7e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4390,6 +4390,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
boolean crdChanged = false;
boolean allReceived = false;
+ boolean wasMerged = false;
ClusterNode crd0;
@@ -4405,8 +4406,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
newCrdFut0.onNodeLeft(node.id());
synchronized (mux) {
- if (!srvNodes.remove(node))
- return;
+ srvNodes.remove(node);
boolean rmvd = remaining.remove(node.id());
@@ -4415,6 +4415,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (mergedJoinExchMsgs.get(node.id()) == null) {
mergedJoinExchMsgs.remove(node.id());
+ wasMerged = true;
rmvd = true;
}
}
@@ -4513,11 +4514,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (allReceived) {
+ boolean wasMerged0 = wasMerged;
+
cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
@Override public void run() {
awaitSingleMapUpdates();
- onAllReceived(null);
+ if (wasMerged0)
+ finishExchangeOnCoordinator(null);
+ else
+ onAllReceived(null);
}
});
}
@@ -4866,14 +4872,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** {@inheritDoc} */
@Override public String toString() {
Set<UUID> remaining;
+ Set<UUID> mergedJoinExch;
synchronized (mux) {
remaining = new HashSet<>(this.remaining);
+ mergedJoinExch = mergedJoinExchMsgs == null ? null : new HashSet<>(mergedJoinExchMsgs.keySet());
}
return S.toString(GridDhtPartitionsExchangeFuture.class, this,
"evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
"remaining", remaining,
+ "mergedJoinExchMsgs", mergedJoinExch,
"super", super.toString());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
new file mode 100644
index 0000000..0a59f2e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest {
+ /** */
+ private Map<String, DelayableCommunicationSpi> commSpis;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CommunicationSpi commSpi = commSpis == null ? null : commSpis.get(igniteInstanceName);
+
+ if (commSpi != null)
+ cfg.setCommunicationSpi(commSpi);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testServersFailAfterMerge() throws Exception {
+ DelayableCommunicationSpi delaySpi1 = new DelayableCommunicationSpi((msg) -> {
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;
+
+ return singleMsg.exchangeId() != null && singleMsg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(2, 0));
+ }
+
+ return false;
+ });
+
+ commSpis = F.asMap(
+ getTestIgniteInstanceName(0), new DelayableCommunicationSpi((msg) -> false),
+ getTestIgniteInstanceName(1), delaySpi1,
+ getTestIgniteInstanceName(2), new DelayableCommunicationSpi((msg) -> msg instanceof GridDhtPartitionsSingleMessage),
+ getTestIgniteInstanceName(3), new DelayableCommunicationSpi((msg) -> false)
+ );
+
+ try {
+ IgniteEx crd = startGrid(0);
+
+ GridCachePartitionExchangeManager<Object, Object> exchMgr = crd.context().cache().context().exchange();
+
+ exchMgr.mergeExchangesTestWaitVersion(new AffinityTopologyVersion(3, 0), null);
+
+ // Single message for this node is blocked until further notice.
+ IgniteInternalFuture<IgniteEx> fut = GridTestUtils.runAsync(() -> startGrid(1), "starter1");
+
+ GridTestUtils.waitForCondition(() -> exchMgr.lastTopologyFuture().exchangeId().topologyVersion()
+ .equals(new AffinityTopologyVersion(2, 0)), getTestTimeout());
+
+ IgniteInternalFuture<IgniteEx> futFail = GridTestUtils.runAsync(() -> startGrid(2), "starter2");
+
+ GridTestUtils.waitForCondition(exchMgr::hasPendingExchange, getTestTimeout());
+
+ // Unblock message to proceed merging.
+ delaySpi1.replay(crd.cluster().localNode().id());
+
+ // Wait for merged exchange.
+ GridTestUtils.waitForCondition(
+ () -> exchMgr.mergeExchangesTestWaitVersion() == null, getTestTimeout());
+
+ futFail.cancel();
+ stopGrid(getTestIgniteInstanceName(2), true);
+
+ fut.get();
+
+ try {
+ futFail.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ // Check that next nodes can successfully join topology.
+ startGrid(3);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class DelayableCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private ConcurrentMap<UUID, Collection<Runnable>> delayed = new ConcurrentHashMap<>();
+
+ /** */
+ private IgnitePredicate<Message> delayPred;
+
+ /**
+ * @param delayPred Delay predicate.
+ */
+ private DelayableCommunicationSpi(IgnitePredicate<Message> delayPred) {
+ this.delayPred = delayPred;
+ }
+
+ /**
+ * @param nodeId Node ID to replay.
+ */
+ private void replay(UUID nodeId) {
+ Collection<Runnable> old = delayed.replace(nodeId, new ConcurrentLinkedDeque<>());
+
+ if (old != null) {
+ for (Runnable task : old)
+ task.run();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ final Message msg0 = ((GridIoMessage)msg).message();
+
+ if (delayPred.apply(msg0)) {
+ delayed.computeIfAbsent(
+ node.id(),
+ (nodeId) -> new ConcurrentLinkedDeque<>()
+ ).add(new Runnable() {
+ @Override public void run() {
+ DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ log.info("Delayed message: " + msg0);
+ }
+ else {
+ try {
+ super.sendMessage(node, msg, ackC);
+ }
+ catch (Exception e) {
+ U.log(null, e);
+ }
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index 707244d..471b437 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim
import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest;
+import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
@@ -74,6 +75,7 @@ public class IgniteCacheMvccTestSuite6 {
// Other non-tx tests.
ignoredTests.add(CacheExchangeMergeTest.class);
+ ignoredTests.add(ExchangeMergeStaleServerNodesTest.class);
ignoredTests.add(IgniteExchangeLatchManagerCoordinatorFailTest.class);
ignoredTests.add(PartitionsExchangeCoordinatorFailoverTest.class);
ignoredTests.add(CacheParallelStartTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 2388cfb..444152d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchange
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
@@ -79,6 +80,7 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutNoDeadlockDetectionTest.class, ignoredTests);