You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/16 16:55:38 UTC
svn commit: r1171607 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Author: fpj
Date: Fri Sep 16 14:55:38 2011
New Revision: 1171607
URL: http://svn.apache.org/viewvc?rev=1171607&view=rev
Log:
BOOKKEEPER-59: Race condition in netty code allocates and orphans resources (BK-5 revisited) (ivank via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1171607&r1=1171606&r2=1171607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Sep 16 14:55:38 2011
@@ -38,6 +38,8 @@ BUGFIXES:
BOOKKEEPER-57: NullPointException at bookie.zk@EntryLogger (xulei via ivank)
+ BOOKKEEPER-59: Race condition in netty code allocates and orphans resources (BK-5 revisited) (ivank via fpj)
+
hedwig-server/
BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1171607&r1=1171606&r2=1171607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Fri Sep 16 14:55:38 2011
@@ -64,8 +64,6 @@ public class BookieClient {
PerChannelBookieClient prevChannel = channels.putIfAbsent(addr, channel);
if (prevChannel != null) {
channel = prevChannel;
- } else {
- channel.connect();
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1171607&r1=1171606&r2=1171607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Sep 16 14:55:38 2011
@@ -69,7 +69,6 @@ public class PerChannelBookieClient exte
public static int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
InetSocketAddress addr;
- boolean connected = false;
Semaphore opCounterSem = new Semaphore(2000);
AtomicLong totalBytesOutstanding;
ClientSocketChannelFactory channelFactory;
@@ -83,18 +82,30 @@ public class PerChannelBookieClient exte
* because they are always updated under a lock
*/
Queue<GenericCallback<Void>> pendingOps = new ArrayDeque<GenericCallback<Void>>();
- boolean connectionAttemptInProgress;
Channel channel = null;
+ private enum ConnectionState {
+ DISCONNECTED, CONNECTING, CONNECTED
+ };
+
+ private ConnectionState state;
+
public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
this.addr = addr;
this.executor = executor;
this.totalBytesOutstanding = totalBytesOutstanding;
this.channelFactory = channelFactory;
+ this.state = ConnectionState.DISCONNECTED;
}
- void connect() {
+ synchronized private void connect() {
+ if (state == ConnectionState.CONNECTING) {
+ return;
+ }
+ // Start the connection attempt to the input server host.
+ state = ConnectionState.CONNECTING;
+
if (LOG.isDebugEnabled())
LOG.debug("Connecting to bookie: " + addr);
@@ -105,9 +116,6 @@ public class PerChannelBookieClient exte
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
- // Start the connection attempt to the input server host.
- connectionAttemptInProgress = true;
-
ChannelFuture future = bootstrap.connect(addr);
future.addListener(new ChannelFutureListener() {
@@ -122,15 +130,14 @@ public class PerChannelBookieClient exte
LOG.info("Successfully connected to bookie: " + addr);
rc = BKException.Code.OK;
channel = future.getChannel();
- connected = true;
+ state = ConnectionState.CONNECTED;
} else {
LOG.error("Could not connect to bookie: " + addr);
rc = BKException.Code.BookieHandleNotAvailableException;
channel = null;
- connected = false;
+ state = ConnectionState.DISCONNECTED;
}
- connectionAttemptInProgress = false;
PerChannelBookieClient.this.channel = channel;
// trick to not do operations under the lock, take the list
@@ -144,7 +151,6 @@ public class PerChannelBookieClient exte
for (GenericCallback<Void> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, null);
}
-
}
});
}
@@ -153,13 +159,13 @@ public class PerChannelBookieClient exte
boolean doOpNow;
// common case without lock first
- if (channel != null && connected) {
+ if (channel != null && state == ConnectionState.CONNECTED) {
doOpNow = true;
} else {
synchronized (this) {
// check again under lock
- if (channel != null && connected) {
+ if (channel != null && state == ConnectionState.CONNECTED) {
doOpNow = true;
} else {
@@ -175,10 +181,7 @@ public class PerChannelBookieClient exte
// succeeds
pendingOps.add(op);
- if (!connectionAttemptInProgress) {
- connect();
- }
-
+ connect();
}
}
}
@@ -375,7 +378,7 @@ public class PerChannelBookieClient exte
errorOutOutstandingEntries();
channel.close();
- connected = false;
+ state = ConnectionState.DISCONNECTED;
// we don't want to reconnect right away. If someone sends a request to
// this address, we will reconnect.