You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by jb...@apache.org on 2012/02/08 04:42:02 UTC
svn commit: r1241761 - in /avro/trunk: CHANGES.txt
lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Author: jbaldassari
Date: Wed Feb 8 03:42:02 2012
New Revision: 1241761
URL: http://svn.apache.org/viewvc?rev=1241761&view=rev
Log:
AVRO-1013. Java: NettyTransceiver can hang after server restart.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1241761&r1=1241760&r2=1241761&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Feb 8 03:42:02 2012
@@ -117,6 +117,8 @@ Avro 1.6.2 (unreleased)
AVRO-1020. Java: Fix builder API to correctly handle default
values for enums. (cutting)
+ AVRO-1013. Java: NettyTransceiver can hang after server restart. (jbaldassari)
+
Avro 1.6.1 (8 November 2011)
INCOMPATIBLE CHANGES
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1241761&r1=1241760&r2=1241761&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Wed Feb 8 03:42:02 2012
@@ -396,21 +396,24 @@ public class NettyTransceiver extends Tr
@Override
public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
- writeDataPack(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+ stateLock.readLock().lock();
+ try {
+ writeDataPack(
+ new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
/**
* Writes a NettyDataPack, reconnecting to the remote peer if necessary.
+ * NOTE: The stateLock read lock *must* be acquired before calling this
+ * method.
* @param dataPack the data pack to write.
* @throws IOException if an error occurs connecting to the remote peer.
*/
private void writeDataPack(NettyDataPack dataPack) throws IOException {
- stateLock.readLock().lock();
- try {
- getChannel().write(dataPack);
- } finally {
- stateLock.readLock().unlock();
- }
+ getChannel().write(dataPack);
}
@Override
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1241761&r1=1241760&r2=1241761&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Wed Feb 8 03:42:02 2012
@@ -441,6 +441,47 @@ public class TestNettyServerWithCallback
transceiver2.close();
}
}
+
+ @Test
+ public void clientReconnectAfterServerRestart() throws Exception {
+ // Start up a second server so that closing the server doesn't
+ // interfere with the other unit tests:
+ SimpleImpl simpleImpl = new BlockingSimpleImpl();
+ Server server2 = new NettyServer(new SpecificResponder(Simple.class,
+ simpleImpl), new InetSocketAddress(0));
+ try {
+ server2.start();
+ int serverPort = server2.getPort();
+ System.out.println("server2 port : " + serverPort);
+
+ // Initialize a client, and establish a connection to the server:
+ Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
+ serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
+ Simple.Callback simpleClient2 =
+ SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+ Assert.assertEquals(3, simpleClient2.add(1, 2));
+
+ // Restart the server:
+ server2.close();
+ try {
+ simpleClient2.add(2, -1);
+ Assert.fail("Client should not be able to invoke RPCs " +
+ "because server is no longer running");
+ } catch (Exception e) {
+ // Expected since server is no longer running
+ }
+ Thread.sleep(2000L);
+ server2 = new NettyServer(new SpecificResponder(Simple.class,
+ simpleImpl), new InetSocketAddress(serverPort));
+ server2.start();
+
+ // Invoke an RPC using the same client, which should reestablish the
+ // connection to the server:
+ Assert.assertEquals(3, simpleClient2.add(1, 2));
+ } finally {
+ server2.close();
+ }
+ }
@Ignore
@Test