You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/03/27 08:06:48 UTC

[ignite] branch master updated: IGNITE-11347 Fix pending message in discovery spi on empty ring - Fixes #6123.

This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6701a76  IGNITE-11347 Fix pending message in discovery spi on empty ring - Fixes #6123.
6701a76 is described below

commit 6701a76822d985f5e7f300c4d213632a4c5fc4be
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Mar 27 11:04:49 2019 +0300

    IGNITE-11347 Fix pending message in discovery spi on empty ring - Fixes #6123.
    
    Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 57 ++++++++++++++--------
 .../DistributedMetaStoragePersistentTest.java      |  4 +-
 2 files changed, 39 insertions(+), 22 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 6016e42..eefecbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3650,23 +3650,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (debugMode)
                         debugLog(msg, "Pending messages will be resent to local node");
 
-                    for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
-                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs,
-                            pendingMsgs.customDiscardId);
-
-                        pendingMsg.senderNodeId(locNodeId);
-
-                        msgWorker.addMessage(pendingMsg);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Pending message has been sent to local node [msg=" + msg.id() +
-                                ", pendingMsgId=" + pendingMsg + ']');
-
-                        if (debugMode) {
-                            debugLog(msg, "Pending message has been sent to local node [msg=" + msg.id() +
-                                ", pendingMsgId=" + pendingMsg + ']');
-                        }
-                    }
+                    processPendingMessagesLocally(msg);
                 }
 
                 LT.warn(log, "Local node has detected failed nodes and started cluster-wide procedure. " +
@@ -3676,6 +3660,37 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Called when local node became the only alive node in topology.
+         *
+         * @param curMsg Currently processed messages. There are three options possible:<ul>
+         *     <li>{@link TcpDiscoveryNodeLeftMessage} from the last node.</li>
+         *     <li>{@link TcpDiscoveryNodeFailedMessage} from the last node.</li>
+         *     <li>Any other message, but all other nodes failed while sending.</li>
+         * </ul>
+         */
+        private void processPendingMessagesLocally(TcpDiscoveryAbstractMessage curMsg) {
+            UUID locNodeId = getLocalNodeId();
+
+            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+                prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs,
+                    pendingMsgs.customDiscardId);
+
+                pendingMsg.senderNodeId(locNodeId);
+
+                msgWorker.addMessage(pendingMsg);
+
+                if (log.isDebugEnabled())
+                    log.debug("Pending message has been sent to local node [msg=" + curMsg.id() +
+                        ", pendingMsg=" + pendingMsg + ']');
+
+                if (debugMode) {
+                    debugLog(curMsg, "Pending message has been sent to local node [msg=" + curMsg.id() +
+                        ", pendingMsg=" + pendingMsg + ']');
+                }
+            }
+        }
+
+        /**
          * Segment local node on failed message send.
          */
         private void segmentLocalNodeOnSendFail(List<TcpDiscoveryNode> failedNodes) {
@@ -5152,6 +5167,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg);
 
                 U.closeQuiet(sock);
+
+                processPendingMessagesLocally(msg);
             }
 
             checkPendingCustomMessages();
@@ -5330,6 +5347,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg);
 
                 U.closeQuiet(sock);
+
+                processPendingMessagesLocally(msg);
             }
 
             checkPendingCustomMessages();
@@ -5896,7 +5915,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (joiningEmpty && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 
-                while ((msg = pollPendingCustomeMessage()) != null)
+                while ((msg = pollPendingCustomMessage()) != null)
                     processCustomMessage(msg, true);
             }
         }
@@ -5904,7 +5923,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         /**
          * @return Pending custom message.
          */
-        @Nullable private TcpDiscoveryCustomEventMessage pollPendingCustomeMessage() {
+        @Nullable private TcpDiscoveryCustomEventMessage pollPendingCustomMessage() {
             synchronized (mux) {
                 return pendingCustomMsgs.poll();
             }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index ebf25b1..39ff7f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -294,7 +294,7 @@ public class DistributedMetaStoragePersistentTest extends DistributedMetaStorage
 
         long start = System.currentTimeMillis();
 
-        long duration = GridTestUtils.SF.applyLB(30_000, 5_000);
+        long duration = GridTestUtils.SF.applyLB(15_000, 5_000);
 
         try {
             for (int i = 0; System.currentTimeMillis() < start + duration; i++) {
@@ -311,8 +311,6 @@ public class DistributedMetaStoragePersistentTest extends DistributedMetaStorage
 
         awaitPartitionMapExchange();
 
-        Thread.sleep(3_000L); // Remove later.
-
         for (int i = 0; i < cnt; i++) {
             DistributedMetaStorage distributedMetastorage = metastorage(i);