You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by jb...@apache.org on 2012/02/08 04:42:02 UTC

svn commit: r1241761 - in /avro/trunk: CHANGES.txt lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java

Author: jbaldassari
Date: Wed Feb  8 03:42:02 2012
New Revision: 1241761

URL: http://svn.apache.org/viewvc?rev=1241761&view=rev
Log:
AVRO-1013. Java: NettyTransceiver can hang after server restart.


Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1241761&r1=1241760&r2=1241761&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Feb  8 03:42:02 2012
@@ -117,6 +117,8 @@ Avro 1.6.2 (unreleased)
     AVRO-1020. Java: Fix builder API to correctly handle default
     values for enums.  (cutting)
 
+    AVRO-1013. Java: NettyTransceiver can hang after server restart. (jbaldassari)
+
 Avro 1.6.1 (8 November 2011)
 
   INCOMPATIBLE CHANGES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1241761&r1=1241760&r2=1241761&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Wed Feb  8 03:42:02 2012
@@ -396,21 +396,24 @@ public class NettyTransceiver extends Tr
   
   @Override
   public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
-    writeDataPack(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+    stateLock.readLock().lock();
+    try {
+      writeDataPack(
+          new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
   
   /**
    * Writes a NettyDataPack, reconnecting to the remote peer if necessary.
+   * NOTE: The stateLock read lock *must* be acquired before calling this 
+   * method.
    * @param dataPack the data pack to write.
    * @throws IOException if an error occurs connecting to the remote peer.
    */
   private void writeDataPack(NettyDataPack dataPack) throws IOException {
-    stateLock.readLock().lock();
-    try {
-      getChannel().write(dataPack);
-    } finally {
-      stateLock.readLock().unlock();
-    }
+    getChannel().write(dataPack);
   }
 
   @Override

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1241761&r1=1241760&r2=1241761&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Wed Feb  8 03:42:02 2012
@@ -441,6 +441,47 @@ public class TestNettyServerWithCallback
         transceiver2.close();
     }
   }
+  
+  @Test
+  public void clientReconnectAfterServerRestart() throws Exception {
+    // Start up a second server so that closing the server doesn't 
+    // interfere with the other unit tests:
+    SimpleImpl simpleImpl = new BlockingSimpleImpl();
+    Server server2 = new NettyServer(new SpecificResponder(Simple.class, 
+        simpleImpl), new InetSocketAddress(0));
+    try {      
+      server2.start();
+      int serverPort = server2.getPort();
+      System.out.println("server2 port : " + serverPort);
+
+      // Initialize a client, and establish a connection to the server:
+      Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
+          serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
+      Simple.Callback simpleClient2 = 
+          SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+      Assert.assertEquals(3, simpleClient2.add(1, 2));
+      
+      // Restart the server:
+      server2.close();
+      try {
+        simpleClient2.add(2, -1);
+        Assert.fail("Client should not be able to invoke RPCs " +
+            "because server is no longer running");
+      } catch (Exception e) {
+        // Expected since server is no longer running
+      }
+      Thread.sleep(2000L);
+      server2 = new NettyServer(new SpecificResponder(Simple.class, 
+          simpleImpl), new InetSocketAddress(serverPort));
+      server2.start();
+      
+      // Invoke an RPC using the same client, which should reestablish the 
+      // connection to the server:
+      Assert.assertEquals(3, simpleClient2.add(1, 2));
+    } finally {
+      server2.close();
+    }
+  }
 
   @Ignore
   @Test