You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/08/23 10:23:31 UTC
[ignite] branch master updated: IGNITE-10808 Fixed Discovery
message queue may build up with TcpDiscoveryMetricsUpdateMessage - Fixes
#5771
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7e73098 IGNITE-10808 Fixed Discovery message queue may build up with TcpDiscoveryMetricsUpdateMessage - Fixes #5771
7e73098 is described below
commit 7e73098d4d6e3d5f78326cb11dac7e083a2312dd
Author: Denis Mekhanikov <dm...@gmail.com>
AuthorDate: Fri Aug 23 13:23:11 2019 +0300
IGNITE-10808 Fixed Discovery message queue may build up with TcpDiscoveryMetricsUpdateMessage - Fixes #5771
---
.../ignite/spi/discovery/tcp/ServerImpl.java | 160 ++++++++++++++++---
.../TcpDiscoveryClientMetricsUpdateMessage.java | 5 -
.../messages/TcpDiscoveryMetricsUpdateMessage.java | 5 -
.../discovery/tcp/IgniteMetricsOverflowTest.java | 171 +++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 5 +-
5 files changed, 313 insertions(+), 33 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2fe87c2..490eb9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2823,6 +2823,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** */
private DiscoveryDataPacket gridDiscoveryData;
+ /** Filter for {@link TcpDiscoveryMetricsUpdateMessage}s. */
+ private final MetricsUpdateMessageFilter metricsMsgFilter = new MetricsUpdateMessageFilter();
+
/**
* @param log Logger.
*/
@@ -2882,13 +2885,39 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- if (msg.highPriority() && !ignoreHighPriority)
- queue.addFirst(msg);
+ boolean addFirst = msg.highPriority() && !ignoreHighPriority;
+
+ if (msg instanceof TcpDiscoveryMetricsUpdateMessage) {
+ if (metricsMsgFilter.addMessage((TcpDiscoveryMetricsUpdateMessage)msg))
+ addToQueue(msg, addFirst);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Metric update message has been replaced in the worker's queue: " + msg);
+ }
+ }
else
+ addToQueue(msg, addFirst);
+ }
+
+ /**
+ * @param msg Message to add.
+ * @param addFirst If {@code true}, then the message will be added to a head of a worker's queue.
+ */
+ private void addToQueue(TcpDiscoveryAbstractMessage msg, boolean addFirst) {
+ DebugLogger log = messageLogger(msg);
+
+ if (addFirst) {
+ queue.addFirst(msg);
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been added to a head of a worker's queue: " + msg);
+ }
+ else {
queue.add(msg);
- if (log.isDebugEnabled())
- log.debug("Message has been added to queue: " + msg);
+ if (log.isDebugEnabled())
+ log.debug("Message has been added to a worker's queue: " + msg);
+ }
}
/** */
@@ -5634,7 +5663,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Processes regular metrics update message.
+ * Processes regular metrics update message. If a more recent message of the same kind has been received,
+ * then it will be processed instead of the one taken from the queue.
*
* @param msg Metrics update message.
*/
@@ -5643,6 +5673,10 @@ class ServerImpl extends TcpDiscoveryImpl {
assert !msg.client();
+ int laps = metricsMsgFilter.passedLaps(msg);
+
+ msg = metricsMsgFilter.pollActualMessage(laps, msg);
+
UUID locNodeId = getLocalNodeId();
if (ring.node(msg.creatorNodeId()) == null) {
@@ -5668,7 +5702,7 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) {
+ if (laps == 2) {
if (log.isTraceEnabled())
log.trace("Discarding metrics update message that has made two passes: " + msg);
@@ -5696,8 +5730,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (sendMessageToRemotes(msg)) {
- if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null ||
- !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) {
+ if (laps == 0 && spiStateCopy() == CONNECTED) {
// Message is on its first ring or just created on coordinator.
msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());
@@ -5792,13 +5825,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * @param msg Message.
- */
- private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
- return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
- }
-
- /**
* Processes discard message and discards previously registered pending messages.
*
* @param msg Discard message.
@@ -6144,16 +6170,12 @@ class ServerImpl extends TcpDiscoveryImpl {
* than {@link TcpDiscoveryStatusCheckMessage} is sent across the ring.
*/
private void checkMetricsReceiving() {
- if (lastTimeStatusMsgSentNanos - locNode.lastUpdateTimeNanos() < 0)
+ if (lastTimeStatusMsgSentNanos < locNode.lastUpdateTimeNanos())
lastTimeStatusMsgSentNanos = locNode.lastUpdateTimeNanos();
- long updateTimeNanos = lastTimeStatusMsgSentNanos - lastRingMsgTimeNanos > 0
- ? lastTimeStatusMsgSentNanos
- : lastRingMsgTimeNanos;
+ long updateTimeNanos = Math.max(lastTimeStatusMsgSentNanos, lastRingMsgTimeNanos);
- long elapsed = metricsCheckFreq - U.millisSinceNanos(updateTimeNanos);
-
- if (elapsed > 0)
+ if (U.millisSinceNanos(updateTimeNanos) < metricsCheckFreq)
return;
msgWorker.addMessage(createTcpDiscoveryStatusCheckMessage(locNode, locNode.id(), null));
@@ -7908,4 +7930,98 @@ class ServerImpl extends TcpDiscoveryImpl {
return S.toString(CrossRingMessageSendState.class, this);
}
}
+
+ /**
+ * Filter to keep track of the most recent {@link TcpDiscoveryMetricsUpdateMessage}s.
+ */
+ private class MetricsUpdateMessageFilter {
+ /** The most recent unprocessed metrics update message, which is on its first discovery round trip. */
+ private volatile TcpDiscoveryMetricsUpdateMessage actualFirstLapMetricsUpdate;
+
+ /** The most recent unprocessed metrics update message, which is on its second discovery round trip. */
+ private volatile TcpDiscoveryMetricsUpdateMessage actualSecondLapMetricsUpdate;
+
+ /**
+ * Adds the provided metrics update message to the worker's queue. If there is already a message in the queue,
+ * that has passed the same number of discovery ring laps, then it's replaced with the provided one.
+ *
+ * @param msg Metrics update message that needs to be added to the worker's queue.
+ * @return {@code True} if the message should be added to the worker's queue.
+ * {@code False} if another message of the same kind is already in the queue.
+ */
+ private boolean addMessage(TcpDiscoveryMetricsUpdateMessage msg) {
+ int laps = passedLaps(msg);
+
+ if (laps == 2)
+ return true;
+ else {
+ // The message should be added to the queue only if a similar message is not there already.
+ // Otherwise one of actualFirstLapMetricsUpdate or actualSecondLapMetricsUpdate will be updated only.
+ boolean addToQueue;
+
+ if (laps == 0) {
+ addToQueue = actualFirstLapMetricsUpdate == null;
+
+ actualFirstLapMetricsUpdate = msg;
+ }
+ else {
+ assert laps == 1 : "Unexpected number of laps passed by a metric update message: " + laps;
+
+ addToQueue = actualSecondLapMetricsUpdate == null;
+
+ actualSecondLapMetricsUpdate = msg;
+ }
+
+ return addToQueue;
+ }
+ }
+
+
+ /**
+ * @param laps Number of discovery ring laps passed by the message.
+ * @param msg Message taken from the queue.
+ * @return The most recent message of the same kind received by the local node.
+ */
+ private TcpDiscoveryMetricsUpdateMessage pollActualMessage(int laps, TcpDiscoveryMetricsUpdateMessage msg) {
+ if (laps == 0) {
+ msg = actualFirstLapMetricsUpdate;
+
+ actualFirstLapMetricsUpdate = null;
+ }
+ else if (laps == 1) {
+ msg = actualSecondLapMetricsUpdate;
+
+ actualSecondLapMetricsUpdate = null;
+ }
+
+ return msg;
+ }
+
+ /**
+ * @param msg Metrics update message.
+ * @return Number of laps, that the provided message passed.
+ */
+ private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) {
+ UUID locNodeId = getLocalNodeId();
+
+ boolean hasLocMetrics = hasMetrics(msg, locNodeId);
+
+ if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics && msg.senderNodeId() != null)
+ return 2;
+ else if (msg.senderNodeId() == null || !hasLocMetrics)
+ return 0;
+ else
+ return 1;
+ }
+
+ /**
+ * @param msg Metrics update message to check.
+ * @param nodeId Node ID for which the check should be performed.
+ * @return {@code True} is the message contains metrics of the node with the provided ID.
+ * {@code False} otherwise.
+ */
+ private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
+ return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
index 4cf5355..8092ef3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
@@ -56,11 +56,6 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstract
}
/** {@inheritDoc} */
- @Override public boolean highPriority() {
- return true;
- }
-
- /** {@inheritDoc} */
@Override public boolean traceLogLevel() {
return true;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
index d84fdc4..899b0af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
@@ -219,11 +219,6 @@ public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessag
}
/** {@inheritDoc} */
- @Override public boolean highPriority() {
- return true;
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this, "super", super.toString());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteMetricsOverflowTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteMetricsOverflowTest.java
new file mode 100644
index 0000000..b6c7d58
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteMetricsOverflowTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** */
+public class IgniteMetricsOverflowTest extends GridCommonAbstractTest {
+ /** */
+ private static final int NODES_NUM = 6;
+
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Released when first discovery message gets delayed. */
+ private CountDownLatch slowDownLatch;
+
+ /**
+ * Period of time, for which {@link TcpDiscoverySpi#readReceipt(Socket, long)} execution is delayed on the node
+ * with a slow {@link DiscoverySpi}.
+ */
+ private volatile int readReceiptDelay;
+
+ /** If {@code true}, then {@code TestTcpDiscoverySpi} will be used. */
+ private boolean slowDiscovery;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration configuration = super.getConfiguration(igniteInstanceName);
+
+ configuration.setFailureDetectionTimeout(60_000);
+ configuration.setMetricsUpdateFrequency(500);
+
+ TcpDiscoverySpi discoverySpi;
+
+ discoverySpi = slowDiscovery
+ ? new TestTcpDiscoverySpi()
+ : new TcpDiscoverySpi();
+
+ discoverySpi.setIpFinder(IP_FINDER);
+
+ configuration.setDiscoverySpi(discoverySpi);
+
+ return configuration;
+ }
+
+ /** */
+ @Before
+ public void before() {
+ readReceiptDelay = 0;
+ slowDiscovery = false;
+ }
+
+ /** */
+ @After
+ public void after() {
+ stopAllGrids();
+ }
+
+ /**
+ * Checks a case when one discovery link processes messages slower than {@link TcpDiscoveryMetricsUpdateMessage}s
+ * are generated. In such situation metrics updates shouldn't block processing of other discovery messages.
+ *
+ * This test doesn't have any asserts, since it will time out if discovery SPI is blocked.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMetricOverflow() throws Exception {
+ slowDownLatch = new CountDownLatch(1);
+
+ startGrids(NODES_NUM - 2);
+
+ slowDiscovery = true;
+ startGrid(NODES_NUM - 2);
+
+ slowDiscovery = false;
+ startGrid(NODES_NUM - 1);
+
+ awaitPartitionMapExchange();
+
+ IgniteInternalFuture<?> statsFut = GridTestUtils.runAsync(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(">>>>>> Queue sizes:");
+
+ for (int i = 0; i < NODES_NUM; i++) {
+ Ignite ignite = grid(i);
+
+ TcpDiscoverySpi spi = (TcpDiscoverySpi)ignite.configuration().getDiscoverySpi();
+
+ sb.append(" ").append(spi.getMessageWorkerQueueSize());
+ }
+
+ log.info(sb.toString());
+
+ try {
+ Thread.sleep(2_000);
+ }
+ catch (InterruptedException e) {
+ return;
+ }
+ }
+ });
+
+ try {
+ readReceiptDelay = 1_000;
+
+ slowDownLatch.await();
+
+ IgniteInternalFuture<?> cacheCreateFut =
+ GridTestUtils.runAsync(() -> grid(0).createCache("foo"));
+
+ cacheCreateFut.get(30, TimeUnit.SECONDS);
+ }
+ finally {
+ statsFut.cancel();
+ statsFut.get();
+ }
+ }
+
+ /** */
+ private class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected int readReceipt(Socket sock, long timeout) throws IOException {
+ if (readReceiptDelay > 0) {
+ slowDownLatch.countDown();
+
+ try {
+ Thread.sleep(readReceiptDelay);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ return super.readReceipt(sock, timeout);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 864290f..21542d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectSslTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownSslTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
+import org.apache.ignite.spi.discovery.tcp.IgniteMetricsOverflowTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiCoordinatorChangeTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
@@ -157,7 +158,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
TcpDiscoveryPendingMessageDeliveryTest.class,
- TcpDiscoveryReconnectUnstableTopologyTest.class
+ TcpDiscoveryReconnectUnstableTopologyTest.class,
+
+ IgniteMetricsOverflowTest.class
})
public class IgniteSpiDiscoverySelfTestSuite {
/** */