You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/28 17:27:08 UTC

svn commit: r1497814 - in /zookeeper/bookkeeper/trunk: CHANGES.txt bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java

Author: ivank
Date: Fri Jun 28 15:27:08 2013
New Revision: 1497814

URL: http://svn.apache.org/r1497814
Log:
BOOKKEEPER-620: PerChannelBookieClient race during channel disconnect (ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1497814&r1=1497813&r2=1497814&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Jun 28 15:27:08 2013
@@ -34,6 +34,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-313: Bookkeeper shutdown call from Bookie thread is not shutting down server (vinay via ivank)
 
+      BOOKKEEPER-620: PerChannelBookieClient race during channel disconnect (ivank)
+
       bookkeeper-server:
 
         BOOKKEEPER-567: ReadOnlyBookieTest hangs on shutdown (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1497814&r1=1497813&r2=1497814&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Jun 28 15:27:08 2013
@@ -20,11 +20,14 @@ package org.apache.bookkeeper.proto;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
+import java.util.Set;
+import java.util.Collections;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
@@ -76,7 +79,6 @@ public class PerChannelBookieClient exte
     AtomicLong totalBytesOutstanding;
     ClientSocketChannelFactory channelFactory;
     OrderedSafeExecutor executor;
-    private Timer readTimeoutTimer;
 
     ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
     ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
@@ -88,11 +90,11 @@ public class PerChannelBookieClient exte
     Queue<GenericCallback<Void>> pendingOps = new ArrayDeque<GenericCallback<Void>>();
     volatile Channel channel = null;
 
-    private enum ConnectionState {
+    enum ConnectionState {
         DISCONNECTED, CONNECTING, CONNECTED, CLOSED
             };
 
-    private volatile ConnectionState state;
+    volatile ConnectionState state;
     private final ClientConfiguration conf;
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
@@ -108,7 +110,6 @@ public class PerChannelBookieClient exte
         this.totalBytesOutstanding = totalBytesOutstanding;
         this.channelFactory = channelFactory;
         this.state = ConnectionState.DISCONNECTED;
-        this.readTimeoutTimer = null;
     }
 
     private void connect() {
@@ -122,29 +123,36 @@ public class PerChannelBookieClient exte
         bootstrap.setOption("keepAlive", true);
 
         ChannelFuture future = bootstrap.connect(addr);
-
         future.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
+                LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.getChannel());
                 int rc;
                 Queue<GenericCallback<Void>> oldPendingOps;
 
                 synchronized (PerChannelBookieClient.this) {
-
                     if (future.isSuccess() && state == ConnectionState.CONNECTING) {
-                        LOG.info("Successfully connected to bookie: " + addr);
+                        LOG.info("Successfully connected to bookie: {}", future.getChannel());
                         rc = BKException.Code.OK;
                         channel = future.getChannel();
                         state = ConnectionState.CONNECTED;
                     } else if (future.isSuccess() && (state == ConnectionState.CLOSED
                                                       || state == ConnectionState.DISCONNECTED)) {
-                        LOG.error("Closed before connection completed, clean up: " + addr);
-                        future.getChannel().close();
+                        LOG.warn("Closed before connection completed, clean up: {}, current state {}",
+                                 future.getChannel(), state);
+                        closeChannel(future.getChannel());
                         rc = BKException.Code.BookieHandleNotAvailableException;
                         channel = null;
+                    } else if (future.isSuccess() && state == ConnectionState.CONNECTED) {
+                        LOG.debug("Already connected with another channel({}), so close the new channel({})",
+                                  channel, future.getChannel());
+                        closeChannel(future.getChannel());
+                        return; // pendingOps should have been completed when other channel connected
                     } else {
-                        LOG.error("Could not connect to bookie: " + addr);
+                        LOG.error("Could not connect to bookie: {}, current state {}",
+                                  future.getChannel(), state);
                         rc = BKException.Code.BookieHandleNotAvailableException;
+                        closeChannel(future.getChannel());
                         channel = null;
                         if (state != ConnectionState.CLOSED) {
                             state = ConnectionState.DISCONNECTED;
@@ -327,20 +335,28 @@ public class PerChannelBookieClient exte
     }
 
     private void closeInternal(boolean permanent) {
+        Channel toClose = null;
         synchronized (this) {
             if (permanent) {
                 state = ConnectionState.CLOSED;
             } else if (state != ConnectionState.CLOSED) {
                 state = ConnectionState.DISCONNECTED;
             }
+            toClose = channel;
+            channel = null;
         }
-        if (channel != null) {
-            channel.close().awaitUninterruptibly();
+        if (toClose != null) {
+            closeChannel(toClose).awaitUninterruptibly();
         }
-        if (readTimeoutTimer != null) {
-            readTimeoutTimer.stop();
-            readTimeoutTimer = null;
+    }
+
+    private ChannelFuture closeChannel(Channel c) {
+        LOG.debug("Closing channel {}", c);
+        ReadTimeoutHandler timeout = c.getPipeline().get(ReadTimeoutHandler.class);
+        if (timeout != null) {
+            timeout.releaseExternalResources();
         }
+        return c.close();
     }
 
     void errorOutReadKey(final CompletionKey key) {
@@ -421,11 +437,7 @@ public class PerChannelBookieClient exte
     public ChannelPipeline getPipeline() throws Exception {
         ChannelPipeline pipeline = Channels.pipeline();
 
-        if (readTimeoutTimer == null) {
-            readTimeoutTimer = new HashedWheelTimer();
-        }
-
-        pipeline.addLast("readTimeout", new ReadTimeoutHandler(readTimeoutTimer, 
+        pipeline.addLast("readTimeout", new ReadTimeoutHandler(new HashedWheelTimer(),
                                                                conf.getReadTimeout()));
         pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
@@ -442,14 +454,18 @@ public class PerChannelBookieClient exte
      */
     @Override
     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        LOG.info("Disconnected from bookie: " + addr);
-        errorOutOutstandingEntries();
-        Channel c = this.channel;
+        Channel c = ctx.getChannel();
+        LOG.info("Disconnected from bookie channel {}", c);
         if (c != null) {
-            c.close();
+            closeChannel(c);
         }
+        if (this.channel == c) {
+            errorOutOutstandingEntries();
+        }
+
         synchronized (this) {
-            if (state != ConnectionState.CLOSED) {
+            if (this.channel == c
+                && state != ConnectionState.CLOSED) {
                 state = ConnectionState.DISCONNECTED;
             }
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java?rev=1497814&r1=1497813&r2=1497814&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java Fri Jun 28 15:27:08 2013
@@ -24,12 +24,16 @@ package org.apache.bookkeeper.proto;
 import org.junit.*;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
-
+import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
@@ -111,4 +115,96 @@ public class TestPerChannelBookieClient 
         channelFactory.releaseExternalResources();
         executor.shutdown();
     }
+
+    /**
+     * Test that all resources are freed if connections and disconnections
+     * are interleaved randomly.
+     *
+     * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-620}
+     */
+    @Test(timeout=60000)
+    public void testDisconnectRace() throws Exception {
+        final GenericCallback<Void> nullop = new GenericCallback<Void>() {
+            @Override
+            public void operationComplete(int rc, Void result) {
+                // do nothing, we don't care about doing anything with the connection,
+                // we just want to trigger it connecting.
+            }
+        };
+        final int ITERATIONS = 100000;
+        ClientSocketChannelFactory channelFactory
+            = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                                                Executors.newCachedThreadPool());
+        OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+        InetSocketAddress addr = getBookie(0);
+
+        AtomicLong bytesOutstanding = new AtomicLong(0);
+        final PerChannelBookieClient client = new PerChannelBookieClient(executor,
+                channelFactory, addr, bytesOutstanding);
+        final AtomicBoolean shouldFail = new AtomicBoolean(false);
+        final AtomicBoolean running = new AtomicBoolean(true);
+        final CountDownLatch disconnectRunning = new CountDownLatch(1);
+        Thread connectThread = new Thread() {
+                public void run() {
+                    try {
+                        if (!disconnectRunning.await(10, TimeUnit.SECONDS)) {
+                            LOG.error("Disconnect thread never started");
+                            shouldFail.set(true);
+                        }
+                    } catch (InterruptedException ie) {
+                        LOG.error("Connect thread interrupted", ie);
+                        Thread.currentThread().interrupt();
+                        running.set(false);
+                    }
+                    for (int i = 0; i < ITERATIONS && running.get(); i++) {
+                        client.connectIfNeededAndDoOp(nullop);
+                    }
+                    running.set(false);
+                }
+            };
+        Thread disconnectThread = new Thread() {
+                public void run() {
+                    disconnectRunning.countDown();
+                    while (running.get()) {
+                        client.disconnect();
+                    }
+                }
+            };
+        Thread checkThread = new Thread() {
+                public void run() {
+                    ConnectionState state;
+                    Channel channel;
+                    while (running.get()) {
+                        synchronized (client) {
+                            state = client.state;
+                            channel = client.channel;
+
+                            if ((state == ConnectionState.CONNECTED
+                                 && (channel == null
+                                     || !channel.isConnected()))
+                                || (state != ConnectionState.CONNECTED
+                                    && channel != null
+                                    && channel.isConnected())) {
+                                LOG.error("State({}) and channel({}) inconsistent " + channel,
+                                          state, channel == null ? null : channel.isConnected());
+                                shouldFail.set(true);
+                                running.set(false);
+                            }
+                        }
+                    }
+                }
+            };
+        connectThread.start();
+        disconnectThread.start();
+        checkThread.start();
+
+        connectThread.join();
+        disconnectThread.join();
+        checkThread.join();
+        assertFalse("Failure in threads, check logs", shouldFail.get());
+
+        client.close();
+        channelFactory.releaseExternalResources();
+        executor.shutdown();
+    }
 }