You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by nk...@apache.org on 2017/11/15 12:24:53 UTC
avro git commit: AVRO-1407: Java: Fix infinite loop on slow connect
in NettyTransceiver. Contributed by Gareth Davis.
Repository: avro
Updated Branches:
refs/heads/branch-1.7 011a85653 -> 40733f9ab
AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver. Contributed by Gareth Davis.
git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1641894 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit fbaf3c399e2f34e57008d8625c76f0543a3cadf4)
Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/40733f9a
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/40733f9a
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/40733f9a
Branch: refs/heads/branch-1.7
Commit: 40733f9ab971c7f3fa26bb6c1a2f6dbf88b2a612
Parents: 011a856
Author: Doug Cutting <cu...@apache.org>
Authored: Wed Nov 26 19:30:18 2014 +0000
Committer: Nandor Kollar <nk...@apache.org>
Committed: Wed Nov 15 11:53:46 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/avro/ipc/NettyTransceiver.java | 12 +++
.../ipc/NettyTransceiverWhenFailsToConnect.java | 82 ++++++++++++++++++++
3 files changed, 97 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9f24bbd..5402b0e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -75,6 +75,9 @@ Trunk (not yet released)
AVRO-1883: Java: Fix incompatible schema detection nested in unions.
(Yibing Shi via blue)
+ AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver.
+ (Gareth Davis via cutting)
+
Avro 1.7.7 (23 July 2014)
NEW FEATURES
http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
index f53a5bc..a8a2e3d 100644
--- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
+++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
@@ -204,6 +204,18 @@ public class NettyTransceiver extends Transceiver {
stateLock.readLock().lock();
try {
getChannel();
+ } catch (Throwable e) {
+ // must attempt to clean up any allocated channel future
+ if (channelFuture != null) {
+ channelFuture.getChannel().close();
+ }
+
+ if (e instanceof IOException)
+ throw (IOException)e;
+ if (e instanceof RuntimeException)
+ throw (RuntimeException)e;
+ // all that's left is Error
+ throw (Error)e;
} finally {
stateLock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
----------------------------------------------------------------------
diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
new file mode 100644
index 0000000..6eefc2d
--- /dev/null
+++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import junit.framework.Assert;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This is a very specific test that verifies that if the NettyTransceiver fails
+ * to connect it cleans up the netty channel that it has created.
+ */
+public class NettyTransceiverWhenFailsToConnect {
+
+ @Test(expected = IOException.class)
+ public void testNettyTransceiverReleasesNettyChannelOnFailingToConnect() throws Exception {
+ ServerSocket serverSocket = null;
+ LastChannelRememberingChannelFactory socketChannelFactory = null;
+
+ try {
+ serverSocket = new ServerSocket(0);
+ socketChannelFactory = new LastChannelRememberingChannelFactory();
+
+ try {
+ new NettyTransceiver(
+ new InetSocketAddress(serverSocket.getLocalPort()),
+ socketChannelFactory,
+ 1L
+ );
+ } finally {
+ assertEquals("expected that the channel opened by the transceiver is closed",
+ false, socketChannelFactory.lastChannel.isOpen());
+ }
+ } finally {
+
+ if (serverSocket != null) {
+ // closing the server socket will actually free up the open channel in the
+ // transceiver, which would have hung otherwise (pre AVRO-1407)
+ serverSocket.close();
+ }
+
+ if (socketChannelFactory != null) {
+ socketChannelFactory.releaseExternalResources();
+ }
+ }
+ }
+
+ class LastChannelRememberingChannelFactory extends NioClientSocketChannelFactory implements ChannelFactory {
+
+ volatile SocketChannel lastChannel;
+
+ @Override
+ public SocketChannel newChannel(ChannelPipeline pipeline) {
+ return lastChannel= super.newChannel(pipeline);
+ }
+ }
+}