You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/28 17:59:22 UTC
svn commit: r1497824 - in /zookeeper/bookkeeper/branches/branch-4.2: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/
bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/
Author: ivank
Date: Fri Jun 28 15:59:22 2013
New Revision: 1497824
URL: http://svn.apache.org/r1497824
Log:
BOOKKEEPER-620: PerChannelBookieClient race during channel disconnect (ivank)
Modified:
zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1497824&r1=1497823&r2=1497824&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Fri Jun 28 15:59:22 2013
@@ -32,6 +32,8 @@ Release 4.2.2 - Unreleased
BOOKKEEPER-623: LedgerChecker should avoid segments of closed ledger with higher start entryId than closed entry. (vinay via sijie)
+ BOOKKEEPER-620: PerChannelBookieClient race during channel disconnect (ivank)
+
hedwig-server:
BOOKKEEPER-579: TestSubAfterCloseSub was put in a wrong package (sijie via ivank)
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1497824&r1=1497823&r2=1497824&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Jun 28 15:59:22 2013
@@ -20,11 +20,14 @@ package org.apache.bookkeeper.proto;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
+import java.util.Set;
+import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.ImmutableSet;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
@@ -78,7 +81,6 @@ public class PerChannelBookieClient exte
AtomicLong totalBytesOutstanding;
ClientSocketChannelFactory channelFactory;
OrderedSafeExecutor executor;
- private Timer readTimeoutTimer;
ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
@@ -90,11 +92,11 @@ public class PerChannelBookieClient exte
Queue<GenericCallback<Void>> pendingOps = new ArrayDeque<GenericCallback<Void>>();
volatile Channel channel = null;
- private enum ConnectionState {
+ enum ConnectionState {
DISCONNECTED, CONNECTING, CONNECTED, CLOSED
};
- private volatile ConnectionState state;
+ volatile ConnectionState state;
private final ClientConfiguration conf;
public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
@@ -110,7 +112,6 @@ public class PerChannelBookieClient exte
this.totalBytesOutstanding = totalBytesOutstanding;
this.channelFactory = channelFactory;
this.state = ConnectionState.DISCONNECTED;
- this.readTimeoutTimer = null;
}
private void connect() {
@@ -124,29 +125,36 @@ public class PerChannelBookieClient exte
bootstrap.setOption("keepAlive", true);
ChannelFuture future = bootstrap.connect(addr);
-
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.getChannel());
int rc;
Queue<GenericCallback<Void>> oldPendingOps;
synchronized (PerChannelBookieClient.this) {
-
if (future.isSuccess() && state == ConnectionState.CONNECTING) {
- LOG.info("Successfully connected to bookie: " + addr);
+ LOG.info("Successfully connected to bookie: {}", future.getChannel());
rc = BKException.Code.OK;
channel = future.getChannel();
state = ConnectionState.CONNECTED;
} else if (future.isSuccess() && (state == ConnectionState.CLOSED
|| state == ConnectionState.DISCONNECTED)) {
- LOG.error("Closed before connection completed, clean up: " + addr);
- future.getChannel().close();
+ LOG.warn("Closed before connection completed, clean up: {}, current state {}",
+ future.getChannel(), state);
+ closeChannel(future.getChannel());
rc = BKException.Code.BookieHandleNotAvailableException;
channel = null;
+ } else if (future.isSuccess() && state == ConnectionState.CONNECTED) {
+ LOG.debug("Already connected with another channel({}), so close the new channel({})",
+ channel, future.getChannel());
+ closeChannel(future.getChannel());
+ return; // pendingOps should have been completed when other channel connected
} else {
- LOG.error("Could not connect to bookie: " + addr);
+ LOG.error("Could not connect to bookie: {}, current state {}",
+ future.getChannel(), state);
rc = BKException.Code.BookieHandleNotAvailableException;
+ closeChannel(future.getChannel());
channel = null;
if (state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
@@ -359,20 +367,28 @@ public class PerChannelBookieClient exte
}
private void closeInternal(boolean permanent) {
+ Channel toClose = null;
synchronized (this) {
if (permanent) {
state = ConnectionState.CLOSED;
} else if (state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
+ toClose = channel;
+ channel = null;
}
- if (channel != null) {
- channel.close().awaitUninterruptibly();
+ if (toClose != null) {
+ closeChannel(toClose).awaitUninterruptibly();
}
- if (readTimeoutTimer != null) {
- readTimeoutTimer.stop();
- readTimeoutTimer = null;
+ }
+
+ private ChannelFuture closeChannel(Channel c) {
+ LOG.debug("Closing channel {}", c);
+ ReadTimeoutHandler timeout = c.getPipeline().get(ReadTimeoutHandler.class);
+ if (timeout != null) {
+ timeout.releaseExternalResources();
}
+ return c.close();
}
void errorOutReadKey(final CompletionKey key) {
@@ -453,11 +469,7 @@ public class PerChannelBookieClient exte
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
- if (readTimeoutTimer == null) {
- readTimeoutTimer = new HashedWheelTimer();
- }
-
- pipeline.addLast("readTimeout", new ReadTimeoutHandler(readTimeoutTimer,
+ pipeline.addLast("readTimeout", new ReadTimeoutHandler(new HashedWheelTimer(),
conf.getReadTimeout()));
pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
pipeline.addLast("mainhandler", this);
@@ -469,14 +481,18 @@ public class PerChannelBookieClient exte
*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.info("Disconnected from bookie: " + addr);
- errorOutOutstandingEntries();
- Channel c = this.channel;
+ Channel c = ctx.getChannel();
+ LOG.info("Disconnected from bookie channel {}", c);
if (c != null) {
- c.close();
+ closeChannel(c);
}
+ if (this.channel == c) {
+ errorOutOutstandingEntries();
+ }
+
synchronized (this) {
- if (state != ConnectionState.CLOSED) {
+ if (this.channel == c
+ && state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
}
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java?rev=1497824&r1=1497823&r2=1497824&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java Fri Jun 28 15:59:22 2013
@@ -24,12 +24,16 @@ package org.apache.bookkeeper.proto;
import org.junit.*;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
-
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -111,4 +115,96 @@ public class TestPerChannelBookieClient
channelFactory.releaseExternalResources();
executor.shutdown();
}
-}
\ No newline at end of file
+
+ /**
+ * Test that all resources are freed if connections and disconnections
+ * are interleaved randomly.
+ *
+ * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-620}
+ */
+ @Test(timeout=60000)
+ public void testDisconnectRace() throws Exception {
+ final GenericCallback<Void> nullop = new GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ // do nothing, we don't care about doing anything with the connection,
+ // we just want to trigger it connecting.
+ }
+ };
+ final int ITERATIONS = 100000;
+ ClientSocketChannelFactory channelFactory
+ = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+ InetSocketAddress addr = getBookie(0);
+
+ AtomicLong bytesOutstanding = new AtomicLong(0);
+ final PerChannelBookieClient client = new PerChannelBookieClient(executor,
+ channelFactory, addr, bytesOutstanding);
+ final AtomicBoolean shouldFail = new AtomicBoolean(false);
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final CountDownLatch disconnectRunning = new CountDownLatch(1);
+ Thread connectThread = new Thread() {
+ public void run() {
+ try {
+ if (!disconnectRunning.await(10, TimeUnit.SECONDS)) {
+ LOG.error("Disconnect thread never started");
+ shouldFail.set(true);
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Connect thread interrupted", ie);
+ Thread.currentThread().interrupt();
+ running.set(false);
+ }
+ for (int i = 0; i < ITERATIONS && running.get(); i++) {
+ client.connectIfNeededAndDoOp(nullop);
+ }
+ running.set(false);
+ }
+ };
+ Thread disconnectThread = new Thread() {
+ public void run() {
+ disconnectRunning.countDown();
+ while (running.get()) {
+ client.disconnect();
+ }
+ }
+ };
+ Thread checkThread = new Thread() {
+ public void run() {
+ ConnectionState state;
+ Channel channel;
+ while (running.get()) {
+ synchronized (client) {
+ state = client.state;
+ channel = client.channel;
+
+ if ((state == ConnectionState.CONNECTED
+ && (channel == null
+ || !channel.isConnected()))
+ || (state != ConnectionState.CONNECTED
+ && channel != null
+ && channel.isConnected())) {
+ LOG.error("State({}) and channel({}) inconsistent " + channel,
+ state, channel == null ? null : channel.isConnected());
+ shouldFail.set(true);
+ running.set(false);
+ }
+ }
+ }
+ }
+ };
+ connectThread.start();
+ disconnectThread.start();
+ checkThread.start();
+
+ connectThread.join();
+ disconnectThread.join();
+ checkThread.join();
+ assertFalse("Failure in threads, check logs", shouldFail.get());
+
+ client.close();
+ channelFactory.releaseExternalResources();
+ executor.shutdown();
+ }
+}