You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by nk...@apache.org on 2017/11/15 12:24:53 UTC

avro git commit: AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver. Contributed by Gareth Davis.

Repository: avro
Updated Branches:
  refs/heads/branch-1.7 011a85653 -> 40733f9ab


AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver.  Contributed by Gareth Davis.

git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1641894 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit fbaf3c399e2f34e57008d8625c76f0543a3cadf4)


Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/40733f9a
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/40733f9a
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/40733f9a

Branch: refs/heads/branch-1.7
Commit: 40733f9ab971c7f3fa26bb6c1a2f6dbf88b2a612
Parents: 011a856
Author: Doug Cutting <cu...@apache.org>
Authored: Wed Nov 26 19:30:18 2014 +0000
Committer: Nandor Kollar <nk...@apache.org>
Committed: Wed Nov 15 11:53:46 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../org/apache/avro/ipc/NettyTransceiver.java   | 12 +++
 .../ipc/NettyTransceiverWhenFailsToConnect.java | 82 ++++++++++++++++++++
 3 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9f24bbd..5402b0e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -75,6 +75,9 @@ Trunk (not yet released)
     AVRO-1883: Java: Fix incompatible schema detection nested in unions.
     (Yibing Shi via blue)
 
+    AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver.
+    (Gareth Davis via cutting)
+
 Avro 1.7.7 (23 July 2014)
 
   NEW FEATURES

http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
index f53a5bc..a8a2e3d 100644
--- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
+++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
@@ -204,6 +204,18 @@ public class NettyTransceiver extends Transceiver {
     stateLock.readLock().lock();
     try {
       getChannel();
+    } catch (Throwable e) {
+      // must attempt to clean up any allocated channel future
+      if (channelFuture != null) {
+        channelFuture.getChannel().close();
+      }
+
+      if (e instanceof IOException)
+        throw (IOException)e;
+      if (e instanceof RuntimeException)
+        throw (RuntimeException)e;
+      // all that's left is Error
+      throw (Error)e;
     } finally {
       stateLock.readLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
----------------------------------------------------------------------
diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
new file mode 100644
index 0000000..6eefc2d
--- /dev/null
+++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import junit.framework.Assert;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This is a very specific test that verifies that if the NettyTransceiver fails
+ * to connect it cleans up the netty channel that it has created.
+ */
+public class NettyTransceiverWhenFailsToConnect {
+
+    @Test(expected = IOException.class)
+    public void testNettyTransceiverReleasesNettyChannelOnFailingToConnect() throws Exception {
+        ServerSocket serverSocket = null;
+        LastChannelRememberingChannelFactory socketChannelFactory = null;
+
+        try {
+            serverSocket = new ServerSocket(0);
+            socketChannelFactory = new LastChannelRememberingChannelFactory();
+
+            try {
+                new NettyTransceiver(
+                        new InetSocketAddress(serverSocket.getLocalPort()),
+                        socketChannelFactory,
+                        1L
+                );
+            } finally {
+                assertEquals("expected that the channel opened by the transceiver is closed",
+                        false, socketChannelFactory.lastChannel.isOpen());
+            }
+        } finally {
+
+            if (serverSocket != null) {
+                // closing the server socket will actually free up the open channel in the
+                // transceiver, which would have hung otherwise (pre AVRO-1407)
+                serverSocket.close();
+            }
+
+            if (socketChannelFactory != null) {
+                socketChannelFactory.releaseExternalResources();
+            }
+        }
+    }
+
+    class LastChannelRememberingChannelFactory extends NioClientSocketChannelFactory implements ChannelFactory {
+
+        volatile SocketChannel lastChannel;
+
+        @Override
+        public SocketChannel newChannel(ChannelPipeline pipeline) {
+            return lastChannel= super.newChannel(pipeline);
+        }
+    }
+}