You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/11/26 20:30:18 UTC

svn commit: r1641894 - in /avro/trunk: CHANGES.txt lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java

Author: cutting
Date: Wed Nov 26 19:30:18 2014
New Revision: 1641894

URL: http://svn.apache.org/r1641894
Log:
AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver.  Contributed by Gareth Davis.

Added:
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1641894&r1=1641893&r2=1641894&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Nov 26 19:30:18 2014
@@ -81,6 +81,9 @@ Trunk (not yet released)
     AVRO-1564. Java: Fix handling of optional byte field in Thrift.
     (Michael Pershyn via cutting)
 
+    AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver.
+    (Gareth Davis via cutting)
+
 Avro 1.7.7 (23 July 2014)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1641894&r1=1641893&r2=1641894&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Wed Nov 26 19:30:18 2014
@@ -204,6 +204,18 @@ public class NettyTransceiver extends Tr
     stateLock.readLock().lock();
     try {
       getChannel();
+    } catch (Throwable e) {
+      // must attempt to clean up any allocated channel future
+      if (channelFuture != null) {
+        channelFuture.getChannel().close();
+      }
+
+      if (e instanceof IOException)
+        throw (IOException)e;
+      if (e instanceof RuntimeException)
+        throw (RuntimeException)e;
+      // all that's left is Error
+      throw (Error)e;
     } finally {
       stateLock.readLock().unlock();
     }

Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java?rev=1641894&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java Wed Nov 26 19:30:18 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import junit.framework.Assert;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This is a very specific test that verifies that if the NettyTransceiver fails
+ * to connect it cleans up the netty channel that it has created.
+ */
+public class NettyTransceiverWhenFailsToConnect {
+
+    @Test(expected = IOException.class)
+    public void testNettyTransceiverReleasesNettyChannelOnFailingToConnect() throws Exception {
+        ServerSocket serverSocket = null;
+        LastChannelRememberingChannelFactory socketChannelFactory = null;
+
+        try {
+            serverSocket = new ServerSocket(0);
+            socketChannelFactory = new LastChannelRememberingChannelFactory();
+
+            try {
+                new NettyTransceiver(
+                        new InetSocketAddress(serverSocket.getLocalPort()),
+                        socketChannelFactory,
+                        1L
+                );
+            } finally {
+                assertEquals("expected that the channel opened by the transceiver is closed",
+                        false, socketChannelFactory.lastChannel.isOpen());
+            }
+        } finally {
+
+            if (serverSocket != null) {
+                // closing the server socket will actually free up the open channel in the
+                // transceiver, which would have hung otherwise (pre AVRO-1407)
+                serverSocket.close();
+            }
+
+            if (socketChannelFactory != null) {
+                socketChannelFactory.releaseExternalResources();
+            }
+        }
+    }
+
+    class LastChannelRememberingChannelFactory extends NioClientSocketChannelFactory implements ChannelFactory {
+
+        volatile SocketChannel lastChannel;
+
+        @Override
+        public SocketChannel newChannel(ChannelPipeline pipeline) {
+            return lastChannel= super.newChannel(pipeline);
+        }
+    }
+}

Propchange: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
------------------------------------------------------------------------------
    svn:eol-style = native