You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/03/27 08:06:48 UTC
[ignite] branch master updated: IGNITE-11347 Fix pending message in
discovery spi on empty ring - Fixes #6123.
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6701a76 IGNITE-11347 Fix pending message in discovery spi on empty ring - Fixes #6123.
6701a76 is described below
commit 6701a76822d985f5e7f300c4d213632a4c5fc4be
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Mar 27 11:04:49 2019 +0300
IGNITE-11347 Fix pending message in discovery spi on empty ring - Fixes #6123.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
.../ignite/spi/discovery/tcp/ServerImpl.java | 57 ++++++++++++++--------
.../DistributedMetaStoragePersistentTest.java | 4 +-
2 files changed, 39 insertions(+), 22 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 6016e42..eefecbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3650,23 +3650,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
debugLog(msg, "Pending messages will be resent to local node");
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
- prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs,
- pendingMsgs.customDiscardId);
-
- pendingMsg.senderNodeId(locNodeId);
-
- msgWorker.addMessage(pendingMsg);
-
- if (log.isDebugEnabled())
- log.debug("Pending message has been sent to local node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ']');
-
- if (debugMode) {
- debugLog(msg, "Pending message has been sent to local node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ']');
- }
- }
+ processPendingMessagesLocally(msg);
}
LT.warn(log, "Local node has detected failed nodes and started cluster-wide procedure. " +
@@ -3676,6 +3660,37 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Called when local node became the only alive node in topology.
+ *
+ * @param curMsg Currently processed messages. There are three options possible:<ul>
+ * <li>{@link TcpDiscoveryNodeLeftMessage} from the last node.</li>
+ * <li>{@link TcpDiscoveryNodeFailedMessage} from the last node.</li>
+ * <li>Any other message, but all other nodes failed while sending.</li>
+ * </ul>
+ */
+ private void processPendingMessagesLocally(TcpDiscoveryAbstractMessage curMsg) {
+ UUID locNodeId = getLocalNodeId();
+
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs,
+ pendingMsgs.customDiscardId);
+
+ pendingMsg.senderNodeId(locNodeId);
+
+ msgWorker.addMessage(pendingMsg);
+
+ if (log.isDebugEnabled())
+ log.debug("Pending message has been sent to local node [msg=" + curMsg.id() +
+ ", pendingMsg=" + pendingMsg + ']');
+
+ if (debugMode) {
+ debugLog(curMsg, "Pending message has been sent to local node [msg=" + curMsg.id() +
+ ", pendingMsg=" + pendingMsg + ']');
+ }
+ }
+ }
+
+ /**
* Segment local node on failed message send.
*/
private void segmentLocalNodeOnSendFail(List<TcpDiscoveryNode> failedNodes) {
@@ -5152,6 +5167,8 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg);
U.closeQuiet(sock);
+
+ processPendingMessagesLocally(msg);
}
checkPendingCustomMessages();
@@ -5330,6 +5347,8 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg);
U.closeQuiet(sock);
+
+ processPendingMessagesLocally(msg);
}
checkPendingCustomMessages();
@@ -5896,7 +5915,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (joiningEmpty && isLocalNodeCoordinator()) {
TcpDiscoveryCustomEventMessage msg;
- while ((msg = pollPendingCustomeMessage()) != null)
+ while ((msg = pollPendingCustomMessage()) != null)
processCustomMessage(msg, true);
}
}
@@ -5904,7 +5923,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @return Pending custom message.
*/
- @Nullable private TcpDiscoveryCustomEventMessage pollPendingCustomeMessage() {
+ @Nullable private TcpDiscoveryCustomEventMessage pollPendingCustomMessage() {
synchronized (mux) {
return pendingCustomMsgs.poll();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index ebf25b1..39ff7f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -294,7 +294,7 @@ public class DistributedMetaStoragePersistentTest extends DistributedMetaStorage
long start = System.currentTimeMillis();
- long duration = GridTestUtils.SF.applyLB(30_000, 5_000);
+ long duration = GridTestUtils.SF.applyLB(15_000, 5_000);
try {
for (int i = 0; System.currentTimeMillis() < start + duration; i++) {
@@ -311,8 +311,6 @@ public class DistributedMetaStoragePersistentTest extends DistributedMetaStorage
awaitPartitionMapExchange();
- Thread.sleep(3_000L); // Remove later.
-
for (int i = 0; i < cnt; i++) {
DistributedMetaStorage distributedMetastorage = metastorage(i);