You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2012/09/13 22:34:48 UTC

svn commit: r1384514 - in /avro/trunk: CHANGES.txt lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyTransceiverWhenServerStops.java

Author: cutting
Date: Thu Sep 13 20:34:48 2012
New Revision: 1384514

URL: http://svn.apache.org/viewvc?rev=1384514&view=rev
Log:
AVRO-1154. Java: Fix NettyTransciever to not hang when the server is stopped.  Contributed by Bruno Dumon & Karel Vervaeke.

Added:
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyTransceiverWhenServerStops.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1384514&r1=1384513&r2=1384514&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Sep 13 20:34:48 2012
@@ -52,6 +52,9 @@ Avro 1.7.2 (unreleased)
     AVRO-851. Java: Fix a bug in GenericData#toString() when escaping
     characters. (Jeff Mesnil via cutting)
 
+    AVRO-1154. Java: Fix NettyTransciever to not hang when the server
+    is stopped. (Bruno Dumon via cutting)
+
 Avro 1.7.1 (16 July 2012)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1384514&r1=1384513&r2=1384514&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Thu Sep 13 20:34:48 2012
@@ -28,8 +28,8 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.avro.Protocol;
 import org.apache.avro.ipc.NettyTransportCodec.NettyDataPack;
@@ -76,6 +76,10 @@ public class NettyTransceiver extends Tr
   private final ClientBootstrap bootstrap;
   private final InetSocketAddress remoteAddr;
   
+  volatile ChannelFuture channelFuture;
+  volatile boolean stopping;
+  private final Object channelFutureLock = new Object();
+
   /**
    * Read lock must be acquired whenever using non-final state.
    * Write lock must be acquired whenever modifying state.
@@ -89,6 +93,7 @@ public class NettyTransceiver extends Tr
     connectTimeoutMillis = 0L;
     bootstrap = null;
     remoteAddr = null;
+    channelFuture = null;
   }
 
   /**
@@ -242,14 +247,24 @@ public class NettyTransceiver extends Tr
       stateLock.writeLock().lock();
       try {
         if (!isChannelReady(channel)) {
+          synchronized(channelFutureLock) {
+            if (!stopping) {
           LOG.debug("Connecting to " + remoteAddr);
-          ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
+              channelFuture = bootstrap.connect(remoteAddr);
+            }
+          }
+          if (channelFuture != null) {
           channelFuture.awaitUninterruptibly(connectTimeoutMillis);
+
+            synchronized(channelFutureLock) {
           if (!channelFuture.isSuccess()) {
             throw new IOException("Error connecting to " + remoteAddr, 
                 channelFuture.getCause());
           }
           channel = channelFuture.getChannel();
+              channelFuture = null;
+            }
+          }
         }
       } finally {
         // Downgrade to read lock:
@@ -280,6 +295,12 @@ public class NettyTransceiver extends Tr
     Channel channelToClose = null;
     Map<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
     boolean stateReadLockHeld = stateLock.getReadHoldCount() != 0;
+
+    synchronized(channelFutureLock) {
+        if (stopping && channelFuture != null) {
+           channelFuture.cancel();
+        }
+    }
     if (stateReadLockHeld) {
       stateLock.readLock().unlock();
     }
@@ -350,6 +371,7 @@ public class NettyTransceiver extends Tr
   public void close() {
     try {
       // Close the connection:
+      stopping = true;
       disconnect(true, true, null);
     } finally {
       // Shut down all thread pools to exit.

Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyTransceiverWhenServerStops.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyTransceiverWhenServerStops.java?rev=1384514&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyTransceiverWhenServerStops.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyTransceiverWhenServerStops.java Thu Sep 13 20:34:48 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Mail;
+import org.apache.avro.test.Message;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+public class TestNettyTransceiverWhenServerStops {
+  @Test
+    public void testNettyTransceiverWhenServerStops() throws Exception {
+    Mail mailService = new TestNettyServer.MailImpl();
+    Responder responder = new SpecificResponder(Mail.class, mailService);
+    NettyServer server = new NettyServer(responder, new InetSocketAddress(0));
+    server.start();
+
+    int serverPort = server.getPort();
+
+    final NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), 60000L);
+    final Mail mail = SpecificRequestor.getClient(Mail.class, transceiver);
+
+    final AtomicInteger successes = new AtomicInteger();
+    final AtomicInteger failures = new AtomicInteger();
+    final AtomicBoolean quitOnFailure = new AtomicBoolean();
+    List<Thread> threads = new ArrayList<Thread>();
+
+    // Start a bunch of client threads that use the transceiver to send messages
+    for (int i = 0; i < 100; i++) {
+      Thread thread = new Thread(new Runnable() {
+          @Override
+            public void run() {
+            while (true) {
+              try {
+                mail.send(createMessage());
+                successes.incrementAndGet();
+              } catch (Exception e) {
+                failures.incrementAndGet();
+                if (quitOnFailure.get()) {
+                  return;
+                }
+              }
+            }
+          }
+        });
+      threads.add(thread);
+      thread.start();
+    }
+
+    // Be sure the threads are running: wait until we get a good deal of successes
+    while (successes.get() < 10000) {
+      Thread.sleep(50);
+    }
+
+    // Now stop the server
+    server.close();
+
+    // Server is stopped: successes should not increase anymore: wait until we're in that situation
+    while (true) {
+      int previousSuccesses = successes.get();
+      Thread.sleep(500);
+      if (previousSuccesses == successes.get()) {
+        break;
+      }
+    }
+
+    // Start the server again
+    server.start();
+
+    // This part of the test is not solved by the current patch: it shows that when you stop/start
+    // a server, the client requests don't continue immediately but will stay blocked until the timeout
+    // passed to the NettyTransceiver has passed (IIUC)
+    long now = System.currentTimeMillis();
+    /*
+      System.out.println("Waiting on requests to continue");
+      int previousSuccesses = successes.get();
+      while (true) {
+      Thread.sleep(500);
+      if (successes.get() > previousSuccesses) {
+      break;
+      }
+      if (System.currentTimeMillis() - now > 5000) {
+      System.out.println("FYI: requests don't continue immediately...");
+      break;
+      }
+      }
+    */
+
+    // Stop our client, we would expect this to go on immediately
+    System.out.println("Stopping transceiver");
+    quitOnFailure.set(true);
+    now = System.currentTimeMillis();
+    transceiver.close(); // Without the patch, this close seems to hang forever
+
+    // Wait for all threads to quit
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    if (System.currentTimeMillis() - now > 10000) {
+      fail("Stopping NettyTransceiver and waiting for client threads to quit took too long.");
+    } else {
+      System.out.println("Stopping NettyTransceiver and waiting for client threads to quit took "
+                         + (System.currentTimeMillis() - now) + " ms");
+    }
+  }
+
+  private Message createMessage() {
+    Message msg = Message.newBuilder().
+      setTo("wife").
+      setFrom("husband").
+      setBody("I love you!").
+      build();
+    return msg;
+  }
+}

Propchange: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyTransceiverWhenServerStops.java
------------------------------------------------------------------------------
    svn:eol-style = native