You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/11 11:43:47 UTC

[ignite-3] branch main updated: IGNITE-14819 Propagate exceptions from NettyServer and NettyClient (#161)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f29e27d  IGNITE-14819 Propagate exceptions from NettyServer and NettyClient (#161)
f29e27d is described below

commit f29e27d2bff4f4dd3447cf3a78235732d29c6133
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Jun 11 14:41:18 2021 +0300

    IGNITE-14819 Propagate exceptions from NettyServer and NettyClient (#161)
---
 .../internal/netty/ConnectionManagerTest.java      | 71 +++++++++++++++++++++-
 .../network/MessageSerializationRegistryImpl.java  | 11 ++--
 .../network/internal/netty/ConnectionManager.java  | 21 ++++---
 .../network/internal/netty/HandshakeHandler.java   | 25 ++++++--
 .../network/internal/netty/InboundDecoder.java     | 28 +++------
 .../netty/IoExceptionSuppressingHandler.java       | 40 ++++++++++++
 .../ignite/network/internal/netty/NettyClient.java |  3 +-
 .../ignite/network/internal/netty/NettyServer.java |  8 +--
 .../MessageSerializationRegistryImplTest.java      |  4 +-
 .../netty/IoExceptionSuppressingHandlerTest.java   | 58 ++++++++++++++++++
 10 files changed, 224 insertions(+), 45 deletions(-)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
index 7cbf320..8423dc7 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.network.internal.netty;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
@@ -24,8 +25,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
+import io.netty.handler.codec.DecoderException;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkMessagesFactory;
 import org.apache.ignite.network.TestMessage;
@@ -33,14 +36,20 @@ import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
 import org.apache.ignite.network.internal.recovery.RecoveryClientHandshakeManager;
 import org.apache.ignite.network.internal.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link ConnectionManager}.
@@ -219,14 +228,70 @@ public class ConnectionManagerTest {
     }
 
     /**
-     * Create and start a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+     * Tests that a connection to a misconfigured server results in a connection close and an exception on the client
+     * side.
+     */
+    @Test
+    public void testConnectMisconfiguredServer() throws Exception {
+        ConnectionManager client = startManager(4000);
+
+        ConnectionManager server = startManager(4001, mockSerializationRegistry());
+
+        try {
+            client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            assertThat(e.getCause(), isA(IOException.class));
+        }
+    }
+
+    /**
+     * Tests that a connection from a misconfigured client results in an exception.
+     */
+    @Test
+    public void testConnectMisconfiguredClient() throws Exception {
+        ConnectionManager client = startManager(4000, mockSerializationRegistry());
+
+        ConnectionManager server = startManager(4001);
+
+        try {
+            client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            assertThat(e.getCause(), isA(DecoderException.class));
+        }
+    }
+
+    /**
+     * Creates a mock {@link MessageSerializationRegistry} that throws an exception when trying to get a serializer
+     * or a deserializer.
+     */
+    private static MessageSerializationRegistry mockSerializationRegistry() {
+        var mockRegistry = mock(MessageSerializationRegistry.class);
+
+        when(mockRegistry.createDeserializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+        when(mockRegistry.createSerializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+
+        return mockRegistry;
+    }
+
+    /**
+     * Creates and starts a {@link ConnectionManager} listening on the given port.
      *
-     * @param port Port for the {@link ConnectionManager#server}.
+     * @param port Port for the connection manager to listen on.
      * @return Connection manager.
      */
     private ConnectionManager startManager(int port) {
-        var registry = new TestMessageSerializationRegistryImpl();
+        return startManager(port, new TestMessageSerializationRegistryImpl());
+    }
 
+    /**
+     * Creates and starts a {@link ConnectionManager} listening on the given port, configured with the provided
+     * serialization registry.
+     *
+     * @param port Port for the connection manager to listen on.
+     * @param registry Serialization registry.
+     * @return Connection manager.
+     */
+    private ConnectionManager startManager(int port, MessageSerializationRegistry registry) {
         UUID launchId = UUID.randomUUID();
         String consistentId = UUID.randomUUID().toString();
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java b/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java
index 152b8d9..3e1cd6c 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/MessageSerializationRegistryImpl.java
@@ -51,11 +51,12 @@ public class MessageSerializationRegistryImpl implements MessageSerializationReg
             groupFactories = new MessageSerializationFactory<?>[Short.MAX_VALUE + 1];
             factories[groupType] = groupFactories;
         }
-        else if (groupFactories[messageType] != null)
+        else if (groupFactories[messageType] != null) {
             throw new NetworkConfigurationException(String.format(
                 "Message serialization factory for message type %d in module %d is already defined",
                 messageType, groupType
             ));
+        }
 
         groupFactories[messageType] = factory;
 
@@ -78,9 +79,11 @@ public class MessageSerializationRegistryImpl implements MessageSerializationReg
 
         MessageSerializationFactory<?> provider = groupFactories == null ? null : groupFactories[messageType];
 
-        assert provider != null :
-            String.format("No serializer provider defined for group type %d and message type %d",
-                groupType, messageType);
+        if (provider == null) {
+            throw new NetworkConfigurationException(String.format(
+                "No serializer provider defined for group type %d and message type %d", groupType, messageType
+            ));
+        }
 
         return (MessageSerializationFactory<T>) provider;
     }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
index ae5ae16..2a4b23d 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
@@ -23,10 +23,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
@@ -116,11 +117,19 @@ public class ConnectionManager {
      */
     public void start() throws IgniteInternalException {
         try {
-            server.start().join();
+            //TODO: timeout value should be extracted into common configuration
+            // https://issues.apache.org/jira/browse/IGNITE-14538
+            server.start().get(3, TimeUnit.SECONDS);
         }
-        catch (CompletionException e) {
+        catch (ExecutionException e) {
             Throwable cause = e.getCause();
-            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+            throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
+        }
+        catch (TimeoutException e) {
+            throw new IgniteInternalException("Timeout while waiting for the connection manager to start", e);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException("Interrupted while starting the connection manager", e);
         }
     }
 
@@ -190,9 +199,7 @@ public class ConnectionManager {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(
-        SocketAddress address
-    ) {
+    private NettyClient connect(SocketAddress address) {
         var client = new NettyClient(
             address,
             serializationRegistry,
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
index fdbfa86..fc857ce 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network.internal.netty;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.internal.handshake.HandshakeAction;
 import org.apache.ignite.network.internal.handshake.HandshakeException;
@@ -28,6 +29,9 @@ import org.apache.ignite.network.internal.handshake.HandshakeManager;
  * Netty handler of the handshake operation.
  */
 public class HandshakeHandler extends ChannelInboundHandlerAdapter {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(HandshakeHandler.class);
+
     /** Handshake manager. */
     private final HandshakeManager manager;
 
@@ -41,23 +45,31 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    @Override public void handlerAdded(ChannelHandlerContext ctx) {
         HandshakeAction handshakeAction = manager.init(ctx.channel());
 
         handleHandshakeAction(handshakeAction, ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    @Override public void channelActive(ChannelHandlerContext ctx) {
         HandshakeAction handshakeAction = manager.onConnectionOpen(ctx.channel());
 
+        manager.handshakeFuture().whenComplete((unused, throwable) -> {
+            if (throwable != null) {
+                LOG.debug("Error when performing handshake", throwable);
+
+                ctx.close();
+            }
+        });
+
         handleHandshakeAction(handshakeAction, ctx);
 
         ctx.fireChannelActive();
     }
 
     /** {@inheritDoc} */
-    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
         HandshakeAction handshakeAction = manager.onMessage(ctx.channel(), (NetworkMessage) msg);
 
         handleHandshakeAction(handshakeAction, ctx);
@@ -65,7 +77,7 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    @Override public void channelInactive(ChannelHandlerContext ctx) {
         // If this method is called that means channel has been closed before handshake has finished or handshake
         // has failed.
         manager.handshakeFuture().completeExceptionally(
@@ -75,6 +87,11 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
         ctx.fireChannelInactive();
     }
 
+    /** {@inheritDoc} */
+    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        manager.handshakeFuture().completeExceptionally(cause);
+    }
+
     /**
      * Handle {@link HandshakeAction}.
      *
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
index 4337ee4..b3c7052 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.network.internal.netty;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import io.netty.buffer.ByteBuf;
@@ -112,27 +111,18 @@ public class InboundDecoder extends ByteToMessageDecoder {
                     messageAttr.set(msg);
             }
             catch (Throwable e) {
-                LOG.error(
-                    String.format(
-                        "Failed to read message [msg=%s, buf=%s, reader=%s]: %s",
-                        msg, buffer, reader, e.getMessage()
-                    ),
-                    e
-                );
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        String.format(
+                            "Failed to read message [msg=%s, buf=%s, reader=%s]: %s",
+                            msg, buffer, reader, e.getMessage()
+                        ),
+                        e
+                    );
+                }
 
                 throw e;
             }
         }
     }
-
-    /** {@inheritDoc} */
-    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        if (LOG.isDebugEnabled())
-            LOG.debug("Exception caught: " + cause.getMessage(), cause);
-
-        // Ignore IOExceptions that are thrown from the Netty's insides. IOExceptions that occured during reads
-        // or writes should be handled elsewhere.
-        if (cause instanceof Exception && !(cause instanceof IOException) )
-            throw (Exception) cause;
-    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandler.java
new file mode 100644
index 0000000..d1e0c1c
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import java.io.IOException;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Netty handler for suppressing IO exceptions that can happen if a remote peer abruptly closes the connection.
+ */
+class IoExceptionSuppressingHandler extends ChannelInboundHandlerAdapter {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(IoExceptionSuppressingHandler.class);
+
+    /** {@inheritDoc} */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        if (cause instanceof IOException)
+            LOG.debug(cause.getMessage(), cause);
+        else
+            ctx.fireExceptionCaught(cause);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
index 240e5c2..e828ab1 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
@@ -110,7 +110,8 @@ public class NettyClient {
                         new HandshakeHandler(handshakeManager),
                         new MessageHandler(messageListener),
                         new ChunkedWriteHandler(),
-                        new OutboundEncoder(serializationRegistry)
+                        new OutboundEncoder(serializationRegistry),
+                        new IoExceptionSuppressingHandler()
                     );
                 }
             });
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
index 9993b74..fcc42ae 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
@@ -165,13 +165,11 @@ public class NettyServer {
                              */
                             new ChunkedWriteHandler(),
                             // Converts NetworkMessage to a ChunkedNetworkMessageInput
-                            new OutboundEncoder(serializationRegistry)
+                            new OutboundEncoder(serializationRegistry),
+                            new IoExceptionSuppressingHandler()
                         );
 
-                        handshakeManager.handshakeFuture().whenComplete((sender, throwable) -> {
-                            if (sender != null)
-                                newConnectionListener.accept(sender);
-                        });
+                        handshakeManager.handshakeFuture().thenAccept(newConnectionListener);
                     }
                 })
                 /*
diff --git a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
index 28a7b37..b7e40fa 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
@@ -96,8 +96,8 @@ public class MessageSerializationRegistryImplTest {
     public void testCreateSerializersIfNotRegistered() {
         var registry = new MessageSerializationRegistryImpl();
 
-        assertThrows(AssertionError.class, () -> registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
-        assertThrows(AssertionError.class, () -> registry.createDeserializer(Msg.GROUP_TYPE, Msg.TYPE));
+        assertThrows(NetworkConfigurationException.class, () -> registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
+        assertThrows(NetworkConfigurationException.class, () -> registry.createDeserializer(Msg.GROUP_TYPE, Msg.TYPE));
     }
 
     /** */
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandlerTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandlerTest.java
new file mode 100644
index 0000000..0993d95
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/IoExceptionSuppressingHandlerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import java.io.IOException;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test class for the {@link IoExceptionSuppressingHandler}.
+ */
+public class IoExceptionSuppressingHandlerTest {
+    /** */
+    private final ChannelHandlerContext context = mock(ChannelHandlerContext.class);
+
+    /** */
+    private final IoExceptionSuppressingHandler handler = new IoExceptionSuppressingHandler();
+
+    /**
+     * Tests that a "Broken pipe" exception is muted by the handler.
+     */
+    @Test
+    public void testBrokenPipeIoExceptionIsMuted() {
+        handler.exceptionCaught(context, new IOException("Broken pipe"));
+
+        verify(context, never()).fireExceptionCaught(any());
+    }
+
+    /**
+     * Tests that other exception types are porpagated.
+     */
+    @Test
+    public void testOtherExceptionIsPropagated() {
+        handler.exceptionCaught(context, new NullPointerException());
+
+        verify(context).fireExceptionCaught(any());
+    }
+}