You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/02/06 11:56:41 UTC

[ignite] branch master updated: IGNITE-11204 Fixed exchange merge hang when merged node fails before sending single message - Fixes #6028.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 65b18b7  IGNITE-11204 Fixed exchange merge hang when merged node fails before sending single message - Fixes #6028.
65b18b7 is described below

commit 65b18b7ba920b069251da2ec2002a530ceee1404
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Wed Feb 6 14:51:12 2019 +0300

    IGNITE-11204 Fixed exchange merge hang when merged node fails before sending single message - Fixes #6028.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../cache/GridCachePartitionExchangeManager.java   |   9 +
 .../preloader/GridDhtPartitionsExchangeFuture.java |  15 +-
 .../ExchangeMergeStaleServerNodesTest.java         | 188 +++++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite6.java      |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   2 +
 5 files changed, 213 insertions(+), 3 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 71a704c..08762a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2192,6 +2192,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * For testing only.
+     *
+     * @return Current version to wait for.
+     */
+    public AffinityTopologyVersion mergeExchangesTestWaitVersion() {
+        return exchMergeTestWaitVer;
+    }
+
+    /**
      * @param curFut Current exchange future.
      * @param msg Message.
      * @return {@code True} if node is stopping.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f4043ab..170d7e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4390,6 +4390,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     try {
                         boolean crdChanged = false;
                         boolean allReceived = false;
+                        boolean wasMerged = false;
 
                         ClusterNode crd0;
 
@@ -4405,8 +4406,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             newCrdFut0.onNodeLeft(node.id());
 
                         synchronized (mux) {
-                            if (!srvNodes.remove(node))
-                                return;
+                            srvNodes.remove(node);
 
                             boolean rmvd = remaining.remove(node.id());
 
@@ -4415,6 +4415,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                                     if (mergedJoinExchMsgs.get(node.id()) == null) {
                                         mergedJoinExchMsgs.remove(node.id());
 
+                                        wasMerged = true;
                                         rmvd = true;
                                     }
                                 }
@@ -4513,11 +4514,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             }
 
                             if (allReceived) {
+                                boolean wasMerged0 = wasMerged;
+
                                 cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
                                     @Override public void run() {
                                         awaitSingleMapUpdates();
 
-                                        onAllReceived(null);
+                                        if (wasMerged0)
+                                            finishExchangeOnCoordinator(null);
+                                        else
+                                            onAllReceived(null);
                                     }
                                 });
                             }
@@ -4866,14 +4872,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** {@inheritDoc} */
     @Override public String toString() {
         Set<UUID> remaining;
+        Set<UUID> mergedJoinExch;
 
         synchronized (mux) {
             remaining = new HashSet<>(this.remaining);
+            mergedJoinExch = mergedJoinExchMsgs == null ? null : new HashSet<>(mergedJoinExchMsgs.keySet());
         }
 
         return S.toString(GridDhtPartitionsExchangeFuture.class, this,
             "evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
             "remaining", remaining,
+            "mergedJoinExchMsgs", mergedJoinExch,
             "super", super.toString());
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
new file mode 100644
index 0000000..0a59f2e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest {
+    /** */
+    private Map<String, DelayableCommunicationSpi> commSpis;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CommunicationSpi commSpi = commSpis == null ? null : commSpis.get(igniteInstanceName);
+
+        if (commSpi != null)
+            cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testServersFailAfterMerge() throws Exception {
+        DelayableCommunicationSpi delaySpi1 = new DelayableCommunicationSpi((msg) -> {
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;
+
+                return singleMsg.exchangeId() != null && singleMsg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(2, 0));
+            }
+
+            return false;
+        });
+
+        commSpis = F.asMap(
+            getTestIgniteInstanceName(0), new DelayableCommunicationSpi((msg) -> false),
+            getTestIgniteInstanceName(1), delaySpi1,
+            getTestIgniteInstanceName(2), new DelayableCommunicationSpi((msg) -> msg instanceof GridDhtPartitionsSingleMessage),
+            getTestIgniteInstanceName(3), new DelayableCommunicationSpi((msg) -> false)
+        );
+
+        try {
+            IgniteEx crd = startGrid(0);
+
+            GridCachePartitionExchangeManager<Object, Object> exchMgr = crd.context().cache().context().exchange();
+
+            exchMgr.mergeExchangesTestWaitVersion(new AffinityTopologyVersion(3, 0), null);
+
+            // Single message for this node is blocked until further notice.
+            IgniteInternalFuture<IgniteEx> fut = GridTestUtils.runAsync(() -> startGrid(1), "starter1");
+
+            GridTestUtils.waitForCondition(() -> exchMgr.lastTopologyFuture().exchangeId().topologyVersion()
+                .equals(new AffinityTopologyVersion(2, 0)), getTestTimeout());
+
+            IgniteInternalFuture<IgniteEx> futFail = GridTestUtils.runAsync(() -> startGrid(2), "starter2");
+
+            GridTestUtils.waitForCondition(exchMgr::hasPendingExchange, getTestTimeout());
+
+            // Unblock message to proceed merging.
+            delaySpi1.replay(crd.cluster().localNode().id());
+
+            // Wait for merged exchange.
+            GridTestUtils.waitForCondition(
+                () -> exchMgr.mergeExchangesTestWaitVersion() == null, getTestTimeout());
+
+            futFail.cancel();
+            stopGrid(getTestIgniteInstanceName(2), true);
+
+            fut.get();
+
+            try {
+                futFail.get();
+            }
+            catch (IgniteCheckedException ignore) {
+                // No-op.
+            }
+
+            // Check that next nodes can successfully join topology.
+            startGrid(3);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class DelayableCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private ConcurrentMap<UUID, Collection<Runnable>> delayed = new ConcurrentHashMap<>();
+
+        /** */
+        private IgnitePredicate<Message> delayPred;
+
+        /**
+         * @param delayPred Delay predicate.
+         */
+        private DelayableCommunicationSpi(IgnitePredicate<Message> delayPred) {
+            this.delayPred = delayPred;
+        }
+
+        /**
+         * @param nodeId Node ID to replay.
+         */
+        private void replay(UUID nodeId) {
+            Collection<Runnable> old = delayed.replace(nodeId, new ConcurrentLinkedDeque<>());
+
+            if (old != null) {
+                for (Runnable task : old)
+                    task.run();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            final Message msg0 = ((GridIoMessage)msg).message();
+
+            if (delayPred.apply(msg0)) {
+                delayed.computeIfAbsent(
+                    node.id(),
+                    (nodeId) -> new ConcurrentLinkedDeque<>()
+                ).add(new Runnable() {
+                    @Override public void run() {
+                        DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
+                    }
+                });
+
+                log.info("Delayed message: " + msg0);
+            }
+            else {
+                try {
+                    super.sendMessage(node, msg, ackC);
+                }
+                catch (Exception e) {
+                    U.log(null, e);
+                }
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index 707244d..471b437 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest;
+import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
@@ -74,6 +75,7 @@ public class IgniteCacheMvccTestSuite6 {
 
         // Other non-tx tests.
         ignoredTests.add(CacheExchangeMergeTest.class);
+        ignoredTests.add(ExchangeMergeStaleServerNodesTest.class);
         ignoredTests.add(IgniteExchangeLatchManagerCoordinatorFailTest.class);
         ignoredTests.add(PartitionsExchangeCoordinatorFailoverTest.class);
         ignoredTests.add(CacheParallelStartTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 2388cfb..444152d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchange
 import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
@@ -79,6 +80,7 @@ public class IgniteCacheTestSuite6 {
         GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutNoDeadlockDetectionTest.class, ignoredTests);