You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/11 11:43:47 UTC
[ignite-3] branch main updated: IGNITE-14819 Propagate exceptions
from NettyServer and NettyClient (#161)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f29e27d IGNITE-14819 Propagate exceptions from NettyServer and NettyClient (#161)
f29e27d is described below
commit f29e27d2bff4f4dd3447cf3a78235732d29c6133
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Jun 11 14:41:18 2021 +0300
IGNITE-14819 Propagate exceptions from NettyServer and NettyClient (#161)
---
.../internal/netty/ConnectionManagerTest.java | 71 +++++++++++++++++++++-
.../network/MessageSerializationRegistryImpl.java | 11 ++--
.../network/internal/netty/ConnectionManager.java | 21 ++++---
.../network/internal/netty/HandshakeHandler.java | 25 ++++++--
.../network/internal/netty/InboundDecoder.java | 28 +++------
.../netty/IoExceptionSuppressingHandler.java | 40 ++++++++++++
.../ignite/network/internal/netty/NettyClient.java | 3 +-
.../ignite/network/internal/netty/NettyServer.java | 8 +--
.../MessageSerializationRegistryImplTest.java | 4 +-
.../netty/IoExceptionSuppressingHandlerTest.java | 58 ++++++++++++++++++
10 files changed, 224 insertions(+), 45 deletions(-)
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
index 7cbf320..8423dc7 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network.internal.netty;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
@@ -24,8 +25,10 @@ import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import io.netty.handler.codec.DecoderException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessagesFactory;
import org.apache.ignite.network.TestMessage;
@@ -33,14 +36,20 @@ import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.internal.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.network.internal.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests for {@link ConnectionManager}.
@@ -219,14 +228,70 @@ public class ConnectionManagerTest {
}
/**
- * Create and start a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+ * Tests that a connection to a misconfigured server results in a connection close and an exception on the client
+ * side.
+ */
+ @Test
+ public void testConnectMisconfiguredServer() throws Exception {
+ ConnectionManager client = startManager(4000);
+
+ ConnectionManager server = startManager(4001, mockSerializationRegistry());
+
+ try {
+ client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), isA(IOException.class));
+ }
+ }
+
+ /**
+ * Tests that a connection from a misconfigured client results in an exception.
+ */
+ @Test
+ public void testConnectMisconfiguredClient() throws Exception {
+ ConnectionManager client = startManager(4000, mockSerializationRegistry());
+
+ ConnectionManager server = startManager(4001);
+
+ try {
+ client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), isA(DecoderException.class));
+ }
+ }
+
+ /**
+ * Creates a mock {@link MessageSerializationRegistry} that throws an exception when trying to get a serializer
+ * or a deserializer.
+ */
+ private static MessageSerializationRegistry mockSerializationRegistry() {
+ var mockRegistry = mock(MessageSerializationRegistry.class);
+
+ when(mockRegistry.createDeserializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+ when(mockRegistry.createSerializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+
+ return mockRegistry;
+ }
+
+ /**
+ * Creates and starts a {@link ConnectionManager} listening on the given port.
*
- * @param port Port for the {@link ConnectionManager#server}.
+ * @param port Port for the connection manager to listen on.
* @return Connection manager.
*/
private ConnectionManager startManager(int port) {
- var registry = new TestMessageSerializationRegistryImpl();
+ return startManager(port, new TestMessageSerializationRegistryImpl());
+ }
+ /**
+ * Creates and starts a {@link ConnectionManager} listening on the given port, configured with the provided
+ * serialization registry.
+ *
+ * @param port Port for the connection manager to listen on.
+ * @param registry Serialization registry.
+ * @return Connection manager.
+ */
+ private ConnectionManager startManager(int port, MessageSerializationRegistry registry) {
UUID launchId = UUID.randomUUID();
String consistentId = UUID.randomUUID().toString();
diff --git a/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java b/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java
index 152b8d9..3e1cd6c 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java
@@ -51,11 +51,12 @@ public class MessageSerializationRegistryImpl implements MessageSerializationReg
groupFactories = new MessageSerializationFactory<?>[Short.MAX_VALUE + 1];
factories[groupType] = groupFactories;
}
- else if (groupFactories[messageType] != null)
+ else if (groupFactories[messageType] != null) {
throw new NetworkConfigurationException(String.format(
"Message serialization factory for message type %d in module %d is already defined",
messageType, groupType
));
+ }
groupFactories[messageType] = factory;
@@ -78,9 +79,11 @@ public class MessageSerializationRegistryImpl implements MessageSerializationReg
MessageSerializationFactory<?> provider = groupFactories == null ? null : groupFactories[messageType];
- assert provider != null :
- String.format("No serializer provider defined for group type %d and message type %d",
- groupType, messageType);
+ if (provider == null) {
+ throw new NetworkConfigurationException(String.format(
+ "No serializer provider defined for group type %d and message type %d", groupType, messageType
+ ));
+ }
return (MessageSerializationFactory<T>) provider;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
index ae5ae16..2a4b23d 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
@@ -23,10 +23,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -116,11 +117,19 @@ public class ConnectionManager {
*/
public void start() throws IgniteInternalException {
try {
- server.start().join();
+ //TODO: timeout value should be extracted into common configuration
+ // https://issues.apache.org/jira/browse/IGNITE-14538
+ server.start().get(3, TimeUnit.SECONDS);
}
- catch (CompletionException e) {
+ catch (ExecutionException e) {
Throwable cause = e.getCause();
- throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+ throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
+ }
+ catch (TimeoutException e) {
+ throw new IgniteInternalException("Timeout while waiting for the connection manager to start", e);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInternalException("Interrupted while starting the connection manager", e);
}
}
@@ -190,9 +199,7 @@ public class ConnectionManager {
* @param address Target address.
* @return New netty client.
*/
- private NettyClient connect(
- SocketAddress address
- ) {
+ private NettyClient connect(SocketAddress address) {
var client = new NettyClient(
address,
serializationRegistry,
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
index fdbfa86..fc857ce 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network.internal.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.internal.handshake.HandshakeAction;
import org.apache.ignite.network.internal.handshake.HandshakeException;
@@ -28,6 +29,9 @@ import org.apache.ignite.network.internal.handshake.HandshakeManager;
* Netty handler of the handshake operation.
*/
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(HandshakeHandler.class);
+
/** Handshake manager. */
private final HandshakeManager manager;
@@ -41,23 +45,31 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
}
/** {@inheritDoc} */
- @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ @Override public void handlerAdded(ChannelHandlerContext ctx) {
HandshakeAction handshakeAction = manager.init(ctx.channel());
handleHandshakeAction(handshakeAction, ctx);
}
/** {@inheritDoc} */
- @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ @Override public void channelActive(ChannelHandlerContext ctx) {
HandshakeAction handshakeAction = manager.onConnectionOpen(ctx.channel());
+ manager.handshakeFuture().whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Error when performing handshake", throwable);
+
+ ctx.close();
+ }
+ });
+
handleHandshakeAction(handshakeAction, ctx);
ctx.fireChannelActive();
}
/** {@inheritDoc} */
- @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
HandshakeAction handshakeAction = manager.onMessage(ctx.channel(), (NetworkMessage) msg);
handleHandshakeAction(handshakeAction, ctx);
@@ -65,7 +77,7 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
}
/** {@inheritDoc} */
- @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ @Override public void channelInactive(ChannelHandlerContext ctx) {
// If this method is called that means channel has been closed before handshake has finished or handshake
// has failed.
manager.handshakeFuture().completeExceptionally(
@@ -75,6 +87,11 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
ctx.fireChannelInactive();
}
+ /** {@inheritDoc} */
+ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ manager.handshakeFuture().completeExceptionally(cause);
+ }
+
/**
* Handle {@link HandshakeAction}.
*
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
index 4337ee4..b3c7052 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
@@ -17,7 +17,6 @@
package org.apache.ignite.network.internal.netty;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import io.netty.buffer.ByteBuf;
@@ -112,27 +111,18 @@ public class InboundDecoder extends ByteToMessageDecoder {
messageAttr.set(msg);
}
catch (Throwable e) {
- LOG.error(
- String.format(
- "Failed to read message [msg=%s, buf=%s, reader=%s]: %s",
- msg, buffer, reader, e.getMessage()
- ),
- e
- );
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "Failed to read message [msg=%s, buf=%s, reader=%s]: %s",
+ msg, buffer, reader, e.getMessage()
+ ),
+ e
+ );
+ }
throw e;
}
}
}
-
- /** {@inheritDoc} */
- @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (LOG.isDebugEnabled())
- LOG.debug("Exception caught: " + cause.getMessage(), cause);
-
- // Ignore IOExceptions that are thrown from the Netty's insides. IOExceptions that occured during reads
- // or writes should be handled elsewhere.
- if (cause instanceof Exception && !(cause instanceof IOException) )
- throw (Exception) cause;
- }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandler.java
new file mode 100644
index 0000000..d1e0c1c
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import java.io.IOException;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Netty handler for suppressing IO exceptions that can happen if a remote peer abruptly closes the connection.
+ */
+class IoExceptionSuppressingHandler extends ChannelInboundHandlerAdapter {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IoExceptionSuppressingHandler.class);
+
+ /** {@inheritDoc} */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof IOException)
+ LOG.debug(cause.getMessage(), cause);
+ else
+ ctx.fireExceptionCaught(cause);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
index 240e5c2..e828ab1 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
@@ -110,7 +110,8 @@ public class NettyClient {
new HandshakeHandler(handshakeManager),
new MessageHandler(messageListener),
new ChunkedWriteHandler(),
- new OutboundEncoder(serializationRegistry)
+ new OutboundEncoder(serializationRegistry),
+ new IoExceptionSuppressingHandler()
);
}
});
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
index 9993b74..fcc42ae 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
@@ -165,13 +165,11 @@ public class NettyServer {
*/
new ChunkedWriteHandler(),
// Converts NetworkMessage to a ChunkedNetworkMessageInput
- new OutboundEncoder(serializationRegistry)
+ new OutboundEncoder(serializationRegistry),
+ new IoExceptionSuppressingHandler()
);
- handshakeManager.handshakeFuture().whenComplete((sender, throwable) -> {
- if (sender != null)
- newConnectionListener.accept(sender);
- });
+ handshakeManager.handshakeFuture().thenAccept(newConnectionListener);
}
})
/*
diff --git a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
index 28a7b37..b7e40fa 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
@@ -96,8 +96,8 @@ public class MessageSerializationRegistryImplTest {
public void testCreateSerializersIfNotRegistered() {
var registry = new MessageSerializationRegistryImpl();
- assertThrows(AssertionError.class, () -> registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
- assertThrows(AssertionError.class, () -> registry.createDeserializer(Msg.GROUP_TYPE, Msg.TYPE));
+ assertThrows(NetworkConfigurationException.class, () -> registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
+ assertThrows(NetworkConfigurationException.class, () -> registry.createDeserializer(Msg.GROUP_TYPE, Msg.TYPE));
}
/** */
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandlerTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandlerTest.java
new file mode 100644
index 0000000..0993d95
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandlerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import java.io.IOException;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test class for the {@link IoExceptionSuppressingHandler}.
+ */
+public class IoExceptionSuppressingHandlerTest {
+ /** */
+ private final ChannelHandlerContext context = mock(ChannelHandlerContext.class);
+
+ /** */
+ private final IoExceptionSuppressingHandler handler = new IoExceptionSuppressingHandler();
+
+ /**
+ * Tests that a "Broken pipe" exception is muted by the handler.
+ */
+ @Test
+ public void testBrokenPipeIoExceptionIsMuted() {
+ handler.exceptionCaught(context, new IOException("Broken pipe"));
+
+ verify(context, never()).fireExceptionCaught(any());
+ }
+
+ /**
+ * Tests that other exception types are porpagated.
+ */
+ @Test
+ public void testOtherExceptionIsPropagated() {
+ handler.exceptionCaught(context, new NullPointerException());
+
+ verify(context).fireExceptionCaught(any());
+ }
+}