You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/03/16 09:20:28 UTC
[ignite] branch master updated: IGNITE-14224 Extended logging on
closing connection to failed client - Fixes #8824.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8e154f0 IGNITE-14224 Extended logging on closing connection to failed client - Fixes #8824.
8e154f0 is described below
commit 8e154f0d7eee5825c82e6158f3dac725d7bfe847
Author: zstan <st...@gmail.com>
AuthorDate: Tue Mar 16 12:12:26 2021 +0300
IGNITE-14224 Extended logging on closing connection to failed client - Fixes #8824.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../apache/ignite/internal/util/IgniteUtils.java | 28 ++++++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 10 +--
.../ignite/internal/IgniteClientFailuresTest.java | 97 ++++++++++++++++++----
...cpClientDiscoverySpiFailureTimeoutSelfTest.java | 1 +
.../processors/cache/index/IndexMetricsTest.java | 6 +-
5 files changed, 120 insertions(+), 22 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 5d5c360..ff71dff 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -4218,6 +4218,32 @@ public abstract class IgniteUtils {
}
/**
+ * Closes given socket logging possible checked exception.
+ *
+ * @param sock Socket to close. If it's {@code null} - it's no-op.
+ * @param log Logger to log possible checked exception with (optional).
+ */
+ public static void close(@Nullable Socket sock, @Nullable IgniteLogger log) {
+ if (sock != null) {
+ try {
+ // Avoid tls 1.3 incompatibility https://bugs.openjdk.java.net/browse/JDK-8208526
+ sock.shutdownOutput();
+ sock.shutdownInput();
+ }
+ catch (Exception e) {
+ warn(log, "Failed to shutdown socket: " + e.getMessage(), e);
+ }
+
+ try {
+ sock.close();
+ }
+ catch (Exception e) {
+ warn(log, "Failed to close socket: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
* Closes given resource suppressing possible checked exception.
*
* @param rsrc Resource to close. If it's {@code null} - it's no-op.
@@ -4372,7 +4398,7 @@ public abstract class IgniteUtils {
return;
try {
- // Avoid java 12 bug see https://bugs.openjdk.java.net/browse/JDK-8219658
+ // Avoid tls 1.3 incompatibility https://bugs.openjdk.java.net/browse/JDK-8208526
sock.shutdownOutput();
sock.shutdownInput();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a2ebbda..31fe6a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -6803,9 +6803,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Unknown connection detected (is some other software connecting to " +
"this Ignite port?" +
(!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) +
- ") " +
- "[rmtAddr=" + rmtAddr +
- ", locAddr=" + sock.getLocalSocketAddress() + ']');
+ ") [rmtAddr=" + rmtAddr + ", locAddr=" + sock.getLocalSocketAddress() + ']');
LT.warn(log, "Unknown connection detected (is some other software connecting to " +
"this Ignite port?" +
@@ -7364,11 +7362,11 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(clientMsgWrk.runner());
}
- U.closeQuiet(sock);
+ U.close(sock, log);
if (log.isInfoEnabled())
log.info("Finished serving remote node connection [rmtAddr=" + rmtAddr +
- ", rmtPort=" + sock.getPort());
+ ", rmtPort=" + sock.getPort() + ", rmtNodeId=" + nodeId + ']');
if (isLocalNodeCoordinator() && !ring.hasRemoteServerNodes())
U.enhanceThreadName(msgWorkerThread, "crd");
@@ -7862,7 +7860,7 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(runner());
- U.closeQuiet(sock);
+ U.close(sock, log);
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
index 753f09c..1ebfa55 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
@@ -14,13 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.managers.GridManagerAdapter;
+import org.apache.ignite.mxbean.ClusterMetricsMXBean;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridStringLogger;
@@ -36,17 +39,24 @@ import org.junit.Test;
*/
public class IgniteClientFailuresTest extends GridCommonAbstractTest {
/** */
+ private static final String EXCHANGE_WORKER_BLOCKED_MSG = "threadName=exchange-worker, blockedFor=";
+
+ /** */
private GridStringLogger inMemoryLog;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- if (!igniteInstanceName.startsWith("client")) {
+ if (igniteInstanceName.contains("client"))
+ cfg.setClientMode(true);
+ else {
cfg.setClientFailureDetectionTimeout(10_000);
cfg.setSystemWorkerBlockedTimeout(5_000);
+ cfg.setNetworkTimeout(5_000);
+
cfg.setGridLogger(inMemoryLog);
}
@@ -73,13 +83,17 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
*/
@Test
public void testNoMessagesFromFailureProcessor() throws Exception {
- inMemoryLog = new GridStringLogger(false, new GridTestLog4jLogger());
+ GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+ strLog.logLength(1024 * 1024);
- inMemoryLog.logLength(1024 * 1024);
+ inMemoryLog = strLog;
IgniteEx srv = startGrid(0);
- IgniteEx client00 = startClientGrid("client00");
+ inMemoryLog = null;
+
+ IgniteEx client00 = startGrid("client00");
client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
@@ -93,7 +107,7 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
assertTrue(waitRes);
- assertFalse(inMemoryLog.toString().contains("name=tcp-comm-worker"));
+ assertFalse(strLog.toString().contains("name=tcp-comm-worker"));
}
/**
@@ -104,30 +118,85 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
*/
@Test
public void testFailedClientLeavesTopologyAfterTimeout() throws Exception {
+ IgniteEx srv0 = (IgniteEx)startGridsMultiThreaded(3);
+
+ IgniteEx client00 = startGrid("client00");
+ IgniteEx client01 = startGrid("client01");
+
+ client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
+ client01.getOrCreateCache(new CacheConfiguration<>("cache1"));
+
+ IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> breakClient(client00));
+ IgniteInternalFuture f2 = GridTestUtils.runAsync(() -> breakClient(client01));
+
+ f1.get(); f2.get();
+
+ final IgniteClusterEx cl = srv0.cluster();
+
+ assertEquals(5, cl.topology(cl.topologyVersion()).size());
+
+ IgniteEx client02 = startGrid("client02");
+
+ assertEquals(6, cl.topology(cl.topologyVersion()).size());
+
+ boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 4),
+ 20_000);
+
+ assertTrue(waitRes);
+
+ checkCacheOperations(client02.cache("cache0"));
+
+ assertEquals(4, srv0.context().discovery().allNodes().size());
+
+ // Cluster metrics.
+ ClusterMetricsMXBean mxBeanCluster = GridCommonAbstractTest.getMxBean(srv0.name(), "Kernal",
+ ClusterMetricsMXBeanImpl.class.getSimpleName(), ClusterMetricsMXBean.class);
+
+ assertEquals(1, mxBeanCluster.getTotalClientNodes());
+ }
+
+ /**
+ * Test verifies that when some sys thread (on server node) tries to re-establish connection to failed client
+ * and exchange-worker gets blocked waiting for it (e.g. to send partitions full map)
+ * it is not treated as {@link FailureType#SYSTEM_WORKER_BLOCKED}
+ * because this waiting is finite and part of normal operations.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExchangeWorkerIsNotTreatedAsBlockedWhenClientNodeFails() throws Exception {
+ GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+ strLog.logLength(1024 * 1024);
+
+ inMemoryLog = strLog;
+
IgniteEx srv0 = startGrid(0);
- IgniteEx client00 = startClientGrid("client00");
+ inMemoryLog = null;
- Thread.sleep(5_000);
+ IgniteEx client00 = startGrid("client00");
client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
+ startGrid(1);
+
breakClient(client00);
final IgniteClusterEx cl = srv0.cluster();
- assertEquals(2, cl.topology(cl.topologyVersion()).size());
-
- IgniteEx client01 = startClientGrid("client01");
-
assertEquals(3, cl.topology(cl.topologyVersion()).size());
- boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 2),
- 20_000);
+ startGrid("client01");
- checkCacheOperations(client01.cache("cache0"));
+ boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 3),
+ 20_000);
assertTrue(waitRes);
+
+ String logRes = strLog.toString();
+
+ assertFalse(logRes.contains(EXCHANGE_WORKER_BLOCKED_MSG));
}
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index bd0b9b7..4fc6b33 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -443,6 +443,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** */
private volatile long readDelay;
+ /** */
private volatile long writeToSocketDelay;
/** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
index 62da827..2fd14ef 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java
@@ -265,7 +265,11 @@ public class IndexMetricsTest extends AbstractIndexingCommonTest {
* @param cls Cache metrics MXBean implementation.
* @return Cache metrics MXBean.
*/
- private <T extends CacheMetricsMXBean> T cacheMetricsMXBean(IgniteEx n, String cacheName, Class<? super T> cls) {
+ private <T extends CacheMetricsMXBean> T cacheMetricsMXBean(
+ IgniteEx n,
+ String cacheName,
+ Class<? super T> cls
+ ) {
requireNonNull(n);
requireNonNull(cacheName);
requireNonNull(cls);