You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/12/16 20:35:36 UTC
svn commit: r1215248 - in /avro/trunk: CHANGES.txt
lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Author: cutting
Date: Fri Dec 16 19:35:35 2011
New Revision: 1215248
URL: http://svn.apache.org/viewvc?rev=1215248&view=rev
Log:
AVRO-982. Java: Fix NettyTransceiver to not hang when server stops. Contributed by Bruno Dumon.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1215248&r1=1215247&r2=1215248&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Dec 16 19:35:35 2011
@@ -49,6 +49,9 @@ Avro 1.6.2 (unreleased)
AVRO-968. C: Fixed avro_value_cmp and avro_value_cmp_fast for string
values. (Vivek Nadkarni via dcreager)
+ AVRO-982. Java: Fix NettyTransceiver to not hang when server stops.
+ (Bruno Dumon via cutting)
+
Avro 1.6.1 (8 November 2011)
INCOMPATIBLE CHANGES
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1215248&r1=1215247&r2=1215248&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Dec 16 19:35:35 2011
@@ -462,7 +462,7 @@ public class NettyTransceiver extends Tr
if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
// Server closed connection; disconnect client side
LOG.debug("Remote peer " + remoteAddr + " closed connection.");
- disconnect();
+ disconnect(false, true, null);
}
}
super.handleUpstream(ctx, e);
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1215248&r1=1215247&r2=1215248&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Fri Dec 16 19:35:35 2011
@@ -346,7 +346,7 @@ public class TestNettyServerWithCallback
Assert.assertEquals(3, simpleClient2.add(1, 2));
// Now acquire the semaphore so that the server will block:
- blockingSimpleImpl.acquirePermit();
+ blockingSimpleImpl.acquireRunPermit();
simpleClient2.add(1, 2, addFuture);
} finally {
// When the transceiver is closed, the CallFuture should get
@@ -365,11 +365,83 @@ public class TestNettyServerWithCallback
}
Assert.assertTrue("Expected IOException to be thrown", ioeThrown);
} finally {
- blockingSimpleImpl.releasePermit();
+ blockingSimpleImpl.releaseRunPermit();
server2.close();
}
}
+ @Test
+ public void cancelPendingRequestsAfterChannelCloseByServerShutdown() throws Exception {
+ // The purpose of this test is to verify that a client doesn't stay
+ // blocked when a server is unexpectedly killed (or when for some
+ // other reason the channel is suddenly closed) while the server
+ // was in the process of handling a request (thus after it received
+ // the request, and before it returned the response).
+
+ // Start up a second server so that closing the server doesn't
+ // interfere with the other unit tests:
+ BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl();
+ Server server2 = new NettyServer(new SpecificResponder(Simple.class,
+ blockingSimpleImpl), new InetSocketAddress(0));
+ server2.start();
+
+ Transceiver transceiver2 = null;
+
+ try {
+ int serverPort = server2.getPort();
+ System.out.println("server2 port : " + serverPort);
+
+ transceiver2 = new NettyTransceiver(new InetSocketAddress(
+ serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
+
+ final Simple.Callback simpleClient2 =
+ SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+
+ // Acquire the method-enter permit, which will be released by the
+ // server method once we call it
+ blockingSimpleImpl.acquireEnterPermit();
+
+ // Acquire the run permit, to avoid that the server method returns immediately
+ blockingSimpleImpl.acquireRunPermit();
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ simpleClient2.add(3, 4);
+ Assert.fail("Expected an exception");
+ } catch (Exception e) {
+ // expected
+ }
+ }
+ });
+
+ // Start client call
+ t.start();
+
+ // Wait until method is entered on the server side
+ blockingSimpleImpl.acquireEnterPermit();
+
+ // The server side method is now blocked waiting on the run permit
+ // (= is busy handling the request)
+
+ // Stop the server
+ server2.close();
+
+ // With the server gone, we expect the client to get some exception and exit
+ // Wait for client thread to exit
+ t.join(10000);
+
+ Assert.assertFalse("Client request should not be blocked on server shutdown", t.isAlive());
+
+ } finally {
+ blockingSimpleImpl.releaseRunPermit();
+ server2.close();
+ if (transceiver2 != null)
+ transceiver2.close();
+ }
+ }
+
@Ignore
@Test
public void performanceTest() throws Exception {
@@ -459,7 +531,10 @@ public class TestNettyServerWithCallback
* A SimpleImpl that requires a semaphore permit before executing any method.
*/
private static class BlockingSimpleImpl extends SimpleImpl {
- private final Semaphore semaphore = new Semaphore(1);
+ /** Semaphore that is released when the method is entered. */
+ private final Semaphore enterSemaphore = new Semaphore(1);
+ /** Semaphore that must be acquired for the method to run and exit. */
+ private final Semaphore runSemaphore = new Semaphore(1);
/**
* Creates a BlockingSimpleImpl.
@@ -470,76 +545,106 @@ public class TestNettyServerWithCallback
@Override
public String hello(String greeting) throws AvroRemoteException {
- acquirePermit();
+ releaseEnterPermit();
+ acquireRunPermit();
try {
return super.hello(greeting);
} finally {
- releasePermit();
+ releaseRunPermit();
}
}
@Override
public TestRecord echo(TestRecord record) throws AvroRemoteException {
- acquirePermit();
+ releaseEnterPermit();
+ acquireRunPermit();
try {
return super.echo(record);
} finally {
- releasePermit();
+ releaseRunPermit();
}
}
@Override
public int add(int arg1, int arg2) throws AvroRemoteException {
- acquirePermit();
+ releaseEnterPermit();
+ acquireRunPermit();
try {
return super.add(arg1, arg2);
} finally {
- releasePermit();
+ releaseRunPermit();
}
}
@Override
public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
- acquirePermit();
+ releaseEnterPermit();
+ acquireRunPermit();
try {
return super.echoBytes(data);
} finally {
- releasePermit();
+ releaseRunPermit();
}
}
@Override
public Void error() throws AvroRemoteException, TestError {
- acquirePermit();
+ releaseEnterPermit();
+ acquireRunPermit();
try {
return super.error();
} finally {
- releasePermit();
+ releaseRunPermit();
}
}
@Override
public void ack() {
- acquirePermit();
+ releaseEnterPermit();
+ acquireRunPermit();
try {
super.ack();
} finally {
- releasePermit();
+ releaseRunPermit();
}
}
/**
* Acquires a single permit from the semaphore.
*/
- public void acquirePermit() {
- semaphore.acquireUninterruptibly();
+ public void acquireRunPermit() {
+ try {
+ runSemaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
/**
* Releases a single permit to the semaphore.
*/
- public void releasePermit() {
- semaphore.release();
+ public void releaseRunPermit() {
+ runSemaphore.release();
+ }
+
+ /**
+ * Acquires a single permit from the semaphore.
+ */
+ public void acquireEnterPermit() {
+ try {
+ enterSemaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+}
+
+ /**
+ * Releases a single permit to the semaphore.
+ */
+ public void releaseEnterPermit() {
+ enterSemaphore.release();
}
}
}