You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/03/16 09:20:28 UTC

[ignite] branch master updated: IGNITE-14224 Extended logging on closing connection to failed client - Fixes #8824.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e154f0  IGNITE-14224 Extended logging on closing connection to failed client - Fixes #8824.
8e154f0 is described below

commit 8e154f0d7eee5825c82e6158f3dac725d7bfe847
Author: zstan <st...@gmail.com>
AuthorDate: Tue Mar 16 12:12:26 2021 +0300

    IGNITE-14224 Extended logging on closing connection to failed client - Fixes #8824.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../apache/ignite/internal/util/IgniteUtils.java   | 28 ++++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 10 +--
 .../ignite/internal/IgniteClientFailuresTest.java  | 97 ++++++++++++++++++----
 ...cpClientDiscoverySpiFailureTimeoutSelfTest.java |  1 +
 .../processors/cache/index/IndexMetricsTest.java   |  6 +-
 5 files changed, 120 insertions(+), 22 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 5d5c360..ff71dff 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -4218,6 +4218,32 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Closes given socket logging possible checked exception.
+     *
+     * @param sock Socket to close. If it's {@code null} - it's no-op.
+     * @param log Logger to log possible checked exception with (optional).
+     */
+    public static void close(@Nullable Socket sock, @Nullable IgniteLogger log) {
+        if (sock != null) {
+            try {
+                // Avoid tls 1.3 incompatibility https://bugs.openjdk.java.net/browse/JDK-8208526
+                sock.shutdownOutput();
+                sock.shutdownInput();
+            }
+            catch (Exception e) {
+                warn(log, "Failed to shutdown socket: " + e.getMessage(), e);
+            }
+
+            try {
+                sock.close();
+            }
+            catch (Exception e) {
+                warn(log, "Failed to close socket: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
      * Closes given resource suppressing possible checked exception.
      *
      * @param rsrc Resource to close. If it's {@code null} - it's no-op.
@@ -4372,7 +4398,7 @@ public abstract class IgniteUtils {
             return;
 
         try {
-            // Avoid java 12 bug see https://bugs.openjdk.java.net/browse/JDK-8219658
+            // Avoid tls 1.3 incompatibility https://bugs.openjdk.java.net/browse/JDK-8208526
             sock.shutdownOutput();
             sock.shutdownInput();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a2ebbda..31fe6a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -6803,9 +6803,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Unknown connection detected (is some other software connecting to " +
                                 "this Ignite port?" +
                                 (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) +
-                                ") " +
-                                "[rmtAddr=" + rmtAddr +
-                                ", locAddr=" + sock.getLocalSocketAddress() + ']');
+                                ") [rmtAddr=" + rmtAddr + ", locAddr=" + sock.getLocalSocketAddress() + ']');
 
                         LT.warn(log, "Unknown connection detected (is some other software connecting to " +
                             "this Ignite port?" +
@@ -7364,11 +7362,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                     U.interrupt(clientMsgWrk.runner());
                 }
 
-                U.closeQuiet(sock);
+                U.close(sock, log);
 
                 if (log.isInfoEnabled())
                     log.info("Finished serving remote node connection [rmtAddr=" + rmtAddr +
-                        ", rmtPort=" + sock.getPort());
+                        ", rmtPort=" + sock.getPort() + ", rmtNodeId=" + nodeId + ']');
 
                 if (isLocalNodeCoordinator() && !ring.hasRemoteServerNodes())
                     U.enhanceThreadName(msgWorkerThread, "crd");
@@ -7862,7 +7860,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     U.interrupt(runner());
 
-                    U.closeQuiet(sock);
+                    U.close(sock, log);
                 }
             }
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
index 753f09c..1ebfa55 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
@@ -14,13 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.internal;
 
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
+import org.apache.ignite.mxbean.ClusterMetricsMXBean;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridStringLogger;
@@ -36,17 +39,24 @@ import org.junit.Test;
  */
 public class IgniteClientFailuresTest extends GridCommonAbstractTest {
     /** */
+    private static final String EXCHANGE_WORKER_BLOCKED_MSG = "threadName=exchange-worker, blockedFor=";
+
+    /** */
     private GridStringLogger inMemoryLog;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        if (!igniteInstanceName.startsWith("client")) {
+        if (igniteInstanceName.contains("client"))
+            cfg.setClientMode(true);
+        else {
             cfg.setClientFailureDetectionTimeout(10_000);
 
             cfg.setSystemWorkerBlockedTimeout(5_000);
 
+            cfg.setNetworkTimeout(5_000);
+
             cfg.setGridLogger(inMemoryLog);
         }
 
@@ -73,13 +83,17 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
      */
     @Test
     public void testNoMessagesFromFailureProcessor() throws Exception {
-        inMemoryLog = new GridStringLogger(false, new GridTestLog4jLogger());
+        GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+        strLog.logLength(1024 * 1024);
 
-        inMemoryLog.logLength(1024 * 1024);
+        inMemoryLog = strLog;
 
         IgniteEx srv = startGrid(0);
 
-        IgniteEx client00 = startClientGrid("client00");
+        inMemoryLog = null;
+
+        IgniteEx client00 = startGrid("client00");
 
         client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
 
@@ -93,7 +107,7 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
 
         assertTrue(waitRes);
 
-        assertFalse(inMemoryLog.toString().contains("name=tcp-comm-worker"));
+        assertFalse(strLog.toString().contains("name=tcp-comm-worker"));
     }
 
     /**
@@ -104,30 +118,85 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
      */
     @Test
     public void testFailedClientLeavesTopologyAfterTimeout() throws Exception {
+        IgniteEx srv0 = (IgniteEx)startGridsMultiThreaded(3);
+
+        IgniteEx client00 = startGrid("client00");
+        IgniteEx client01 = startGrid("client01");
+
+        client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
+        client01.getOrCreateCache(new CacheConfiguration<>("cache1"));
+
+        IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> breakClient(client00));
+        IgniteInternalFuture f2 = GridTestUtils.runAsync(() -> breakClient(client01));
+
+        f1.get(); f2.get();
+
+        final IgniteClusterEx cl = srv0.cluster();
+
+        assertEquals(5, cl.topology(cl.topologyVersion()).size());
+
+        IgniteEx client02 = startGrid("client02");
+
+        assertEquals(6, cl.topology(cl.topologyVersion()).size());
+
+        boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 4),
+            20_000);
+
+        assertTrue(waitRes);
+
+        checkCacheOperations(client02.cache("cache0"));
+
+        assertEquals(4, srv0.context().discovery().allNodes().size());
+
+        // Cluster metrics.
+        ClusterMetricsMXBean mxBeanCluster = GridCommonAbstractTest.getMxBean(srv0.name(), "Kernal",
+            ClusterMetricsMXBeanImpl.class.getSimpleName(), ClusterMetricsMXBean.class);
+
+        assertEquals(1, mxBeanCluster.getTotalClientNodes());
+    }
+
+    /**
+     * Test verifies that when some sys thread (on server node) tries to re-establish connection to failed client
+     * and exchange-worker gets blocked waiting for it (e.g. to send partitions full map)
+     * it is not treated as {@link FailureType#SYSTEM_WORKER_BLOCKED}
+     * because this waiting is finite and part of normal operations.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExchangeWorkerIsNotTreatedAsBlockedWhenClientNodeFails() throws Exception {
+        GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+        strLog.logLength(1024 * 1024);
+
+        inMemoryLog = strLog;
+
         IgniteEx srv0 = startGrid(0);
 
-        IgniteEx client00 = startClientGrid("client00");
+        inMemoryLog = null;
 
-        Thread.sleep(5_000);
+        IgniteEx client00 = startGrid("client00");
 
         client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
 
+        startGrid(1);
+
         breakClient(client00);
 
         final IgniteClusterEx cl = srv0.cluster();
 
-        assertEquals(2, cl.topology(cl.topologyVersion()).size());
-
-        IgniteEx client01 = startClientGrid("client01");
-
         assertEquals(3, cl.topology(cl.topologyVersion()).size());
 
-        boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 2),
-            20_000);
+        startGrid("client01");
 
-        checkCacheOperations(client01.cache("cache0"));
+        boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 3),
+            20_000);
 
         assertTrue(waitRes);
+
+        String logRes = strLog.toString();
+
+        assertFalse(logRes.contains(EXCHANGE_WORKER_BLOCKED_MSG));
     }
 
     /** */
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index bd0b9b7..4fc6b33 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -443,6 +443,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
         /** */
         private volatile long readDelay;
 
+        /** */
         private volatile long writeToSocketDelay;
 
         /** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
index 62da827..2fd14ef 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
@@ -265,7 +265,11 @@ public class IndexMetricsTest extends AbstractIndexingCommonTest {
      * @param cls Cache metrics MXBean implementation.
      * @return Cache metrics MXBean.
      */
-    private <T extends CacheMetricsMXBean> T cacheMetricsMXBean(IgniteEx n, String cacheName, Class<? super T> cls) {
+    private <T extends CacheMetricsMXBean> T cacheMetricsMXBean(
+        IgniteEx n,
+        String cacheName,
+        Class<? super T> cls
+    ) {
         requireNonNull(n);
         requireNonNull(cacheName);
         requireNonNull(cls);