You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/08/23 10:23:31 UTC

[ignite] branch master updated: IGNITE-10808 Fixed Discovery message queue may build up with TcpDiscoveryMetricsUpdateMessage - Fixes #5771

This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e73098  IGNITE-10808 Fixed Discovery message queue may build up with TcpDiscoveryMetricsUpdateMessage - Fixes #5771
7e73098 is described below

commit 7e73098d4d6e3d5f78326cb11dac7e083a2312dd
Author: Denis Mekhanikov <dm...@gmail.com>
AuthorDate: Fri Aug 23 13:23:11 2019 +0300

    IGNITE-10808 Fixed Discovery message queue may build up with TcpDiscoveryMetricsUpdateMessage - Fixes #5771
---
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 160 ++++++++++++++++---
 .../TcpDiscoveryClientMetricsUpdateMessage.java    |   5 -
 .../messages/TcpDiscoveryMetricsUpdateMessage.java |   5 -
 .../discovery/tcp/IgniteMetricsOverflowTest.java   | 171 +++++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java           |   5 +-
 5 files changed, 313 insertions(+), 33 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2fe87c2..490eb9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2823,6 +2823,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** */
         private DiscoveryDataPacket gridDiscoveryData;
 
+        /** Filter for {@link TcpDiscoveryMetricsUpdateMessage}s. */
+        private final MetricsUpdateMessageFilter metricsMsgFilter = new MetricsUpdateMessageFilter();
+
         /**
          * @param log Logger.
          */
@@ -2882,13 +2885,39 @@ class ServerImpl extends TcpDiscoveryImpl {
                 return;
             }
 
-            if (msg.highPriority() && !ignoreHighPriority)
-                queue.addFirst(msg);
+            boolean addFirst = msg.highPriority() && !ignoreHighPriority;
+
+            if (msg instanceof TcpDiscoveryMetricsUpdateMessage) {
+                if (metricsMsgFilter.addMessage((TcpDiscoveryMetricsUpdateMessage)msg))
+                    addToQueue(msg, addFirst);
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Metric update message has been replaced in the worker's queue: " + msg);
+                }
+            }
             else
+                addToQueue(msg, addFirst);
+        }
+
+        /**
+         * @param msg Message to add.
+         * @param addFirst If {@code true}, then the message will be added to a head of a worker's queue.
+         */
+        private void addToQueue(TcpDiscoveryAbstractMessage msg, boolean addFirst) {
+            DebugLogger log = messageLogger(msg);
+
+            if (addFirst) {
+                queue.addFirst(msg);
+
+                if (log.isDebugEnabled())
+                    log.debug("Message has been added to a head of a worker's queue: " + msg);
+            }
+            else {
                 queue.add(msg);
 
-            if (log.isDebugEnabled())
-                log.debug("Message has been added to queue: " + msg);
+                if (log.isDebugEnabled())
+                    log.debug("Message has been added to a worker's queue: " + msg);
+            }
         }
 
         /** */
@@ -5634,7 +5663,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Processes regular metrics update message.
+         * Processes regular metrics update message. If a more recent message of the same kind has been received,
+         * then it will be processed instead of the one taken from the queue.
          *
          * @param msg Metrics update message.
          */
@@ -5643,6 +5673,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             assert !msg.client();
 
+            int laps = metricsMsgFilter.passedLaps(msg);
+
+            msg = metricsMsgFilter.pollActualMessage(laps, msg);
+
             UUID locNodeId = getLocalNodeId();
 
             if (ring.node(msg.creatorNodeId()) == null) {
@@ -5668,7 +5702,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 return;
             }
 
-            if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) {
+            if (laps == 2) {
                 if (log.isTraceEnabled())
                     log.trace("Discarding metrics update message that has made two passes: " + msg);
 
@@ -5696,8 +5730,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (sendMessageToRemotes(msg)) {
-                if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null ||
-                    !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) {
+                if (laps == 0 && spiStateCopy() == CONNECTED) {
                     // Message is on its first ring or just created on coordinator.
                     msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
                     msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());
@@ -5792,13 +5825,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * @param msg Message.
-         */
-        private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
-            return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
-        }
-
-        /**
          * Processes discard message and discards previously registered pending messages.
          *
          * @param msg Discard message.
@@ -6144,16 +6170,12 @@ class ServerImpl extends TcpDiscoveryImpl {
          * than {@link TcpDiscoveryStatusCheckMessage} is sent across the ring.
          */
         private void checkMetricsReceiving() {
-            if (lastTimeStatusMsgSentNanos - locNode.lastUpdateTimeNanos() < 0)
+            if (lastTimeStatusMsgSentNanos < locNode.lastUpdateTimeNanos())
                 lastTimeStatusMsgSentNanos = locNode.lastUpdateTimeNanos();
 
-            long updateTimeNanos = lastTimeStatusMsgSentNanos - lastRingMsgTimeNanos > 0
-                ? lastTimeStatusMsgSentNanos
-                : lastRingMsgTimeNanos;
+            long updateTimeNanos = Math.max(lastTimeStatusMsgSentNanos, lastRingMsgTimeNanos);
 
-            long elapsed = metricsCheckFreq - U.millisSinceNanos(updateTimeNanos);
-
-            if (elapsed > 0)
+            if (U.millisSinceNanos(updateTimeNanos) < metricsCheckFreq)
                 return;
 
             msgWorker.addMessage(createTcpDiscoveryStatusCheckMessage(locNode, locNode.id(), null));
@@ -7908,4 +7930,98 @@ class ServerImpl extends TcpDiscoveryImpl {
             return S.toString(CrossRingMessageSendState.class, this);
         }
     }
+
+    /**
+     * Filter to keep track of the most recent {@link TcpDiscoveryMetricsUpdateMessage}s.
+     */
+    private class MetricsUpdateMessageFilter {
+        /** The most recent unprocessed metrics update message, which is on its first discovery round trip. */
+        private volatile TcpDiscoveryMetricsUpdateMessage actualFirstLapMetricsUpdate;
+
+        /** The most recent unprocessed metrics update message, which is on its second discovery round trip. */
+        private volatile TcpDiscoveryMetricsUpdateMessage actualSecondLapMetricsUpdate;
+
+        /**
+         * Adds the provided metrics update message to the worker's queue. If there is already a message in the queue,
+         * that has passed the same number of discovery ring laps, then it's replaced with the provided one.
+         *
+         * @param msg Metrics update message that needs to be added to the worker's queue.
+         * @return {@code True} if the message should be added to the worker's queue.
+         * {@code False} if another message of the same kind is already in the queue.
+         */
+        private boolean addMessage(TcpDiscoveryMetricsUpdateMessage msg) {
+            int laps = passedLaps(msg);
+
+            if (laps == 2)
+                return true;
+            else {
+                // The message should be added to the queue only if a similar message is not there already.
+                // Otherwise one of actualFirstLapMetricsUpdate or actualSecondLapMetricsUpdate will be updated only.
+                boolean addToQueue;
+
+                if (laps == 0) {
+                    addToQueue = actualFirstLapMetricsUpdate == null;
+
+                    actualFirstLapMetricsUpdate = msg;
+                }
+                else {
+                    assert laps == 1 : "Unexpected number of laps passed by a metric update message: " + laps;
+
+                    addToQueue = actualSecondLapMetricsUpdate == null;
+
+                    actualSecondLapMetricsUpdate = msg;
+                }
+
+                return addToQueue;
+            }
+        }
+
+
+        /**
+         * @param laps Number of discovery ring laps passed by the message.
+         * @param msg Message taken from the queue.
+         * @return The most recent message of the same kind received by the local node.
+         */
+        private TcpDiscoveryMetricsUpdateMessage pollActualMessage(int laps, TcpDiscoveryMetricsUpdateMessage msg) {
+            if (laps == 0) {
+                msg = actualFirstLapMetricsUpdate;
+
+                actualFirstLapMetricsUpdate = null;
+            }
+            else if (laps == 1) {
+                msg = actualSecondLapMetricsUpdate;
+
+                actualSecondLapMetricsUpdate = null;
+            }
+
+            return msg;
+        }
+
+        /**
+         * @param msg Metrics update message.
+         * @return Number of laps, that the provided message passed.
+         */
+        private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) {
+            UUID locNodeId = getLocalNodeId();
+
+            boolean hasLocMetrics = hasMetrics(msg, locNodeId);
+
+            if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics && msg.senderNodeId() != null)
+                return 2;
+            else if (msg.senderNodeId() == null || !hasLocMetrics)
+                return 0;
+            else
+                return 1;
+        }
+
+        /**
+         * @param msg Metrics update message to check.
+         * @param nodeId Node ID for which the check should be performed.
+         * @return {@code True} is the message contains metrics of the node with the provided ID.
+         * {@code False} otherwise.
+         */
+        private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
+            return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
index 4cf5355..8092ef3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
@@ -56,11 +56,6 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstract
     }
 
     /** {@inheritDoc} */
-    @Override public boolean highPriority() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean traceLogLevel() {
         return true;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
index d84fdc4..899b0af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java
@@ -219,11 +219,6 @@ public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessag
     }
 
     /** {@inheritDoc} */
-    @Override public boolean highPriority() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this, "super", super.toString());
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteMetricsOverflowTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteMetricsOverflowTest.java
new file mode 100644
index 0000000..b6c7d58
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteMetricsOverflowTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** */
+public class IgniteMetricsOverflowTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NODES_NUM = 6;
+
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Released when first discovery message gets delayed. */
+    private CountDownLatch slowDownLatch;
+
+    /**
+     * Period of time, for which {@link TcpDiscoverySpi#readReceipt(Socket, long)} execution is delayed on the node
+     * with a slow {@link DiscoverySpi}.
+     */
+    private volatile int readReceiptDelay;
+
+    /** If {@code true}, then {@code TestTcpDiscoverySpi} will be used. */
+    private boolean slowDiscovery;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration configuration = super.getConfiguration(igniteInstanceName);
+
+        configuration.setFailureDetectionTimeout(60_000);
+        configuration.setMetricsUpdateFrequency(500);
+
+        TcpDiscoverySpi discoverySpi;
+
+        discoverySpi = slowDiscovery
+            ? new TestTcpDiscoverySpi()
+            : new TcpDiscoverySpi();
+
+        discoverySpi.setIpFinder(IP_FINDER);
+
+        configuration.setDiscoverySpi(discoverySpi);
+
+        return configuration;
+    }
+
+    /** */
+    @Before
+    public void before() {
+        readReceiptDelay = 0;
+        slowDiscovery = false;
+    }
+
+    /** */
+    @After
+    public void after() {
+        stopAllGrids();
+    }
+
+    /**
+     * Checks a case when one discovery link processes messages slower than {@link TcpDiscoveryMetricsUpdateMessage}s
+     * are generated. In such situation metrics updates shouldn't block processing of other discovery messages.
+     *
+     * This test doesn't have any asserts, since it will time out if discovery SPI is blocked.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMetricOverflow() throws Exception {
+        slowDownLatch = new CountDownLatch(1);
+
+        startGrids(NODES_NUM - 2);
+
+        slowDiscovery = true;
+        startGrid(NODES_NUM - 2);
+
+        slowDiscovery = false;
+        startGrid(NODES_NUM - 1);
+
+        awaitPartitionMapExchange();
+
+        IgniteInternalFuture<?> statsFut = GridTestUtils.runAsync(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(">>>>>> Queue sizes:");
+
+                for (int i = 0; i < NODES_NUM; i++) {
+                    Ignite ignite = grid(i);
+
+                    TcpDiscoverySpi spi = (TcpDiscoverySpi)ignite.configuration().getDiscoverySpi();
+
+                    sb.append(" ").append(spi.getMessageWorkerQueueSize());
+                }
+
+                log.info(sb.toString());
+
+                try {
+                    Thread.sleep(2_000);
+                }
+                catch (InterruptedException e) {
+                    return;
+                }
+            }
+        });
+
+        try {
+            readReceiptDelay = 1_000;
+
+            slowDownLatch.await();
+
+            IgniteInternalFuture<?> cacheCreateFut =
+                GridTestUtils.runAsync(() -> grid(0).createCache("foo"));
+
+            cacheCreateFut.get(30, TimeUnit.SECONDS);
+        }
+        finally {
+            statsFut.cancel();
+            statsFut.get();
+        }
+    }
+
+    /** */
+    private class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected int readReceipt(Socket sock, long timeout) throws IOException {
+            if (readReceiptDelay > 0) {
+                slowDownLatch.countDown();
+
+                try {
+                    Thread.sleep(readReceiptDelay);
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            return super.readReceipt(sock, timeout);
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 864290f..21542d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectSslTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownSslTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
+import org.apache.ignite.spi.discovery.tcp.IgniteMetricsOverflowTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiCoordinatorChangeTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
@@ -157,7 +158,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
 
     TcpDiscoveryPendingMessageDeliveryTest.class,
 
-    TcpDiscoveryReconnectUnstableTopologyTest.class
+    TcpDiscoveryReconnectUnstableTopologyTest.class,
+
+    IgniteMetricsOverflowTest.class
 })
 public class IgniteSpiDiscoverySelfTestSuite {
     /** */