You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/16 16:55:38 UTC

svn commit: r1171607 - in /zookeeper/bookkeeper/trunk: CHANGES.txt bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Author: fpj
Date: Fri Sep 16 14:55:38 2011
New Revision: 1171607

URL: http://svn.apache.org/viewvc?rev=1171607&view=rev
Log:
BOOKKEEPER-59: Race condition in netty code allocates and orphans resources (BK-5 revisited) (ivank via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1171607&r1=1171606&r2=1171607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Sep 16 14:55:38 2011
@@ -38,6 +38,8 @@ BUGFIXES:
 
   BOOKKEEPER-57: NullPointException at bookie.zk@EntryLogger (xulei via ivank)
 
+  BOOKKEEPER-59: Race condition in netty code allocates and orphans resources (BK-5 revisited) (ivank via fpj)
+
  hedwig-server/
 
   BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1171607&r1=1171606&r2=1171607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Fri Sep 16 14:55:38 2011
@@ -64,8 +64,6 @@ public class BookieClient {
             PerChannelBookieClient prevChannel = channels.putIfAbsent(addr, channel);
             if (prevChannel != null) {
                 channel = prevChannel;
-            } else {
-                channel.connect();
             }
         }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1171607&r1=1171606&r2=1171607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Sep 16 14:55:38 2011
@@ -69,7 +69,6 @@ public class PerChannelBookieClient exte
     public static int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
 
     InetSocketAddress addr;
-    boolean connected = false;
     Semaphore opCounterSem = new Semaphore(2000);
     AtomicLong totalBytesOutstanding;
     ClientSocketChannelFactory channelFactory;
@@ -83,18 +82,30 @@ public class PerChannelBookieClient exte
      * because they are always updated under a lock
      */
     Queue<GenericCallback<Void>> pendingOps = new ArrayDeque<GenericCallback<Void>>();
-    boolean connectionAttemptInProgress;
     Channel channel = null;
 
+    private enum ConnectionState {
+        DISCONNECTED, CONNECTING, CONNECTED
+            };
+
+    private ConnectionState state;
+            
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
                                   InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
         this.addr = addr;
         this.executor = executor;
         this.totalBytesOutstanding = totalBytesOutstanding;
         this.channelFactory = channelFactory;
+        this.state = ConnectionState.DISCONNECTED;
     }
 
-    void connect() {
+    synchronized private void connect() {
+        if (state == ConnectionState.CONNECTING) {
+            return;
+        } 
+        // Start the connection attempt to the input server host.
+        state = ConnectionState.CONNECTING;
+
         if (LOG.isDebugEnabled())
             LOG.debug("Connecting to bookie: " + addr);
 
@@ -105,9 +116,6 @@ public class PerChannelBookieClient exte
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("keepAlive", true);
 
-        // Start the connection attempt to the input server host.
-        connectionAttemptInProgress = true;
-
         ChannelFuture future = bootstrap.connect(addr);
 
         future.addListener(new ChannelFutureListener() {
@@ -122,15 +130,14 @@ public class PerChannelBookieClient exte
                         LOG.info("Successfully connected to bookie: " + addr);
                         rc = BKException.Code.OK;
                         channel = future.getChannel();
-                        connected = true;
+                        state = ConnectionState.CONNECTED;
                     } else {
                         LOG.error("Could not connect to bookie: " + addr);
                         rc = BKException.Code.BookieHandleNotAvailableException;
                         channel = null;
-                        connected = false;
+                        state = ConnectionState.DISCONNECTED;
                     }
 
-                    connectionAttemptInProgress = false;
                     PerChannelBookieClient.this.channel = channel;
 
                     // trick to not do operations under the lock, take the list
@@ -144,7 +151,6 @@ public class PerChannelBookieClient exte
                 for (GenericCallback<Void> pendingOp : oldPendingOps) {
                     pendingOp.operationComplete(rc, null);
                 }
-
             }
         });
     }
@@ -153,13 +159,13 @@ public class PerChannelBookieClient exte
         boolean doOpNow;
 
         // common case without lock first
-        if (channel != null && connected) {
+        if (channel != null && state == ConnectionState.CONNECTED) {
             doOpNow = true;
         } else {
 
             synchronized (this) {
                 // check again under lock
-                if (channel != null && connected) {
+                if (channel != null && state == ConnectionState.CONNECTED) {
                     doOpNow = true;
                 } else {
 
@@ -175,10 +181,7 @@ public class PerChannelBookieClient exte
                     // succeeds
                     pendingOps.add(op);
 
-                    if (!connectionAttemptInProgress) {
-                        connect();
-                    }
-
+                    connect();
                 }
             }
         }
@@ -375,7 +378,7 @@ public class PerChannelBookieClient exte
         errorOutOutstandingEntries();
         channel.close();
 
-        connected = false;
+        state = ConnectionState.DISCONNECTED;
 
         // we don't want to reconnect right away. If someone sends a request to
         // this address, we will reconnect.