You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/12/16 20:35:36 UTC

svn commit: r1215248 - in /avro/trunk: CHANGES.txt lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java

Author: cutting
Date: Fri Dec 16 19:35:35 2011
New Revision: 1215248

URL: http://svn.apache.org/viewvc?rev=1215248&view=rev
Log:
AVRO-982. Java: Fix NettyTransceiver to not hang when server stops.  Contributed by Bruno Dumon.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1215248&r1=1215247&r2=1215248&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Dec 16 19:35:35 2011
@@ -49,6 +49,9 @@ Avro 1.6.2 (unreleased)
     AVRO-968. C: Fixed avro_value_cmp and avro_value_cmp_fast for string
     values. (Vivek Nadkarni via dcreager)
 
+    AVRO-982. Java: Fix NettyTransceiver to not hang when server stops.
+    (Bruno Dumon via cutting)
+
 Avro 1.6.1 (8 November 2011)
 
   INCOMPATIBLE CHANGES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1215248&r1=1215247&r2=1215248&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Dec 16 19:35:35 2011
@@ -462,7 +462,7 @@ public class NettyTransceiver extends Tr
         if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
           // Server closed connection; disconnect client side
           LOG.debug("Remote peer " + remoteAddr + " closed connection.");
-          disconnect();
+          disconnect(false, true, null);
         }
       }
       super.handleUpstream(ctx, e);

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1215248&r1=1215247&r2=1215248&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Fri Dec 16 19:35:35 2011
@@ -346,7 +346,7 @@ public class TestNettyServerWithCallback
         Assert.assertEquals(3, simpleClient2.add(1, 2));
         
         // Now acquire the semaphore so that the server will block:
-        blockingSimpleImpl.acquirePermit();
+        blockingSimpleImpl.acquireRunPermit();
         simpleClient2.add(1, 2, addFuture);
       } finally {
         // When the transceiver is closed, the CallFuture should get 
@@ -365,11 +365,83 @@ public class TestNettyServerWithCallback
       }
       Assert.assertTrue("Expected IOException to be thrown", ioeThrown);
     } finally {
-      blockingSimpleImpl.releasePermit();
+      blockingSimpleImpl.releaseRunPermit();
       server2.close();
     }
   }
   
+  @Test
+  public void cancelPendingRequestsAfterChannelCloseByServerShutdown() throws Exception {
+    // The purpose of this test is to verify that a client doesn't stay
+    // blocked when a server is unexpectedly killed (or when for some
+    // other reason the channel is suddenly closed) while the server
+    // was in the process of handling a request (thus after it received
+    // the request, and before it returned the response).
+
+    // Start up a second server so that closing the server doesn't
+    // interfere with the other unit tests:
+    BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl();
+    Server server2 = new NettyServer(new SpecificResponder(Simple.class,
+        blockingSimpleImpl), new InetSocketAddress(0));
+    server2.start();
+
+    Transceiver transceiver2 = null;
+
+    try {
+      int serverPort = server2.getPort();
+      System.out.println("server2 port : " + serverPort);
+
+      transceiver2 = new NettyTransceiver(new InetSocketAddress(
+          serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
+
+      final Simple.Callback simpleClient2 =
+          SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+
+      // Acquire the method-enter permit, which will be released by the
+      // server method once we call it
+      blockingSimpleImpl.acquireEnterPermit();
+
+      // Acquire the run permit, to avoid that the server method returns immediately
+      blockingSimpleImpl.acquireRunPermit();
+
+      Thread t = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            simpleClient2.add(3, 4);
+            Assert.fail("Expected an exception");
+          } catch (Exception e) {
+            // expected
+          }
+        }
+      });
+      
+      // Start client call
+      t.start();
+      
+      // Wait until method is entered on the server side
+      blockingSimpleImpl.acquireEnterPermit();
+      
+      // The server side method is now blocked waiting on the run permit
+      // (= is busy handling the request)
+      
+      // Stop the server
+      server2.close();
+      
+      // With the server gone, we expect the client to get some exception and exit
+      // Wait for client thread to exit
+      t.join(10000);
+      
+      Assert.assertFalse("Client request should not be blocked on server shutdown", t.isAlive());
+      
+    } finally {
+      blockingSimpleImpl.releaseRunPermit();
+      server2.close();
+      if (transceiver2 != null)
+        transceiver2.close();
+    }
+  }
+
   @Ignore
   @Test
   public void performanceTest() throws Exception {
@@ -459,7 +531,10 @@ public class TestNettyServerWithCallback
    * A SimpleImpl that requires a semaphore permit before executing any method.
    */
   private static class BlockingSimpleImpl extends SimpleImpl {
-    private final Semaphore semaphore = new Semaphore(1);
+    /** Semaphore that is released when the method is entered. */
+    private final Semaphore enterSemaphore = new Semaphore(1);
+    /** Semaphore that must be acquired for the method to run and exit. */
+    private final Semaphore runSemaphore = new Semaphore(1);
     
     /**
      * Creates a BlockingSimpleImpl.
@@ -470,76 +545,106 @@ public class TestNettyServerWithCallback
     
     @Override
     public String hello(String greeting) throws AvroRemoteException {
-      acquirePermit();
+      releaseEnterPermit();
+      acquireRunPermit();
       try {
         return super.hello(greeting);
       } finally {
-        releasePermit();
+        releaseRunPermit();
       }
     }
 
     @Override
     public TestRecord echo(TestRecord record) throws AvroRemoteException {
-      acquirePermit();
+      releaseEnterPermit();
+      acquireRunPermit();
       try {
         return super.echo(record);
       } finally {
-        releasePermit();
+        releaseRunPermit();
       }
     }
 
     @Override
     public int add(int arg1, int arg2) throws AvroRemoteException {
-      acquirePermit();
+      releaseEnterPermit();
+      acquireRunPermit();
       try {
         return super.add(arg1, arg2);
       } finally {
-        releasePermit();
+        releaseRunPermit();
       }
     }
 
     @Override
     public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
-      acquirePermit();
+      releaseEnterPermit();
+      acquireRunPermit();
       try {
         return super.echoBytes(data);
       } finally {
-        releasePermit();
+        releaseRunPermit();
       }
     }
 
     @Override
     public Void error() throws AvroRemoteException, TestError {
-      acquirePermit();
+      releaseEnterPermit();
+      acquireRunPermit();
       try {
         return super.error();
       } finally {
-        releasePermit();
+        releaseRunPermit();
       }
     }
 
     @Override
     public void ack() {
-      acquirePermit();
+      releaseEnterPermit();
+      acquireRunPermit();
       try {
         super.ack();
       } finally {
-        releasePermit();
+        releaseRunPermit();
       }
     }
     
     /**
      * Acquires a single permit from the semaphore.
      */
-    public void acquirePermit() {
-      semaphore.acquireUninterruptibly();
+    public void acquireRunPermit() {
+      try {
+        runSemaphore.acquire();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+    }
     }
     
     /**
      * Releases a single permit to the semaphore.
      */
-    public void releasePermit() {
-      semaphore.release();
+    public void releaseRunPermit() {
+      runSemaphore.release();
+    }
+
+    /**
+     * Acquires a single permit from the semaphore.
+     */
+    public void acquireEnterPermit() {
+      try {
+        enterSemaphore.acquire();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+  }
+}
+
+    /**
+     * Releases a single permit to the semaphore.
+     */
+    public void releaseEnterPermit() {
+        enterSemaphore.release();
     }
   }
 }