You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/01/26 22:11:21 UTC

bookkeeper git commit: BOOKKEEPER-855: handle session expire event in bookie (sijie)

Repository: bookkeeper
Updated Branches:
  refs/heads/master 19160e44d -> 92722ee9c


BOOKKEEPER-855: handle session expire event in bookie (sijie)

This change is to retry bookie registration when zookeeper session expired.

Author: Sijie Guo <si...@apache.org>

Reviewers: Ivan Kelly <iv...@apache.org>, Matteo Merli <mm...@apache.org>

Closes #1 from sijie/sijie/BOOKKEEPER-855


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/92722ee9
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/92722ee9
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/92722ee9

Branch: refs/heads/master
Commit: 92722ee9c34b069e23d1a87d7fc78256b8540268
Parents: 19160e4
Author: Sijie Guo <si...@apache.org>
Authored: Tue Jan 26 13:11:04 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Jan 26 13:11:04 2016 -0800

----------------------------------------------------------------------
 .../org/apache/bookkeeper/bookie/Bookie.java    | 138 +++++++++++++-----
 .../bookkeeper/bookie/ReadOnlyBookie.java       |  66 +--------
 .../bookkeeper/conf/ServerConfiguration.java    |  44 ++++++
 .../bookie/BookieInitializationTest.java        |   4 +-
 .../bookkeeper/client/TestBookieWatcher.java    | 140 +++++++++++++++++++
 .../replication/AuditorLedgerCheckerTest.java   |   2 +-
 .../replication/TestReplicationWorker.java      |   2 +-
 .../bookkeeper/test/BookieZKExpireTest.java     |  15 +-
 .../bookkeeper/test/ReadOnlyBookieTest.java     |   2 +-
 .../zookeeper/TestZooKeeperClient.java          |   8 +-
 10 files changed, 312 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 07b3d30..74876ff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -34,14 +34,18 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -61,12 +65,10 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -119,6 +121,7 @@ public class Bookie extends BookieCriticalThread {
 
     // ZK registration path for this bookie
     protected final String bookieRegistrationPath;
+    protected final String bookieReadonlyRegistrationPath;
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
@@ -142,7 +145,11 @@ public class Bookie extends BookieCriticalThread {
     final protected String zkBookieRegPath;
     final protected String zkBookieReadOnlyPath;
 
+    final private AtomicBoolean zkRegistered = new AtomicBoolean(false);
     final protected AtomicBoolean readOnly = new AtomicBoolean(false);
+    // executor to manage the state changes for a bookie.
+    final ExecutorService stateService = Executors.newSingleThreadExecutor(
+            new ThreadFactoryBuilder().setNameFormat("BookieStateService-%d").build());
 
     // Expose Stats
     private final Counter writeBytes;
@@ -468,6 +475,8 @@ public class Bookie extends BookieCriticalThread {
             throws IOException, KeeperException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
         this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
+        this.bookieReadonlyRegistrationPath =
+            this.bookieRegistrationPath + BookKeeperConstants.READONLY;
         this.conf = conf;
         this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
         this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
@@ -512,7 +521,7 @@ public class Bookie extends BookieCriticalThread {
         // ZK ephemeral node for this Bookie.
         String myID = getMyId();
         zkBookieRegPath = this.bookieRegistrationPath + myID;
-        zkBookieReadOnlyPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY + "/" + myID;
+        zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID;
 
         // Expose Stats
         writeBytes = statsLogger.getCounter(WRITE_BYTES);
@@ -522,7 +531,7 @@ public class Bookie extends BookieCriticalThread {
         readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
         addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
         readBytesStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY_BYTES);
-        // 1 : up, 0 : readonly
+        // 1 : up, 0 : readonly, -1 : unregistered
         statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
@@ -531,7 +540,7 @@ public class Bookie extends BookieCriticalThread {
 
             @Override
             public Number getSample() {
-                return readOnly.get() ? 0 : 1;
+                return zkRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
             }
         });
     }
@@ -541,6 +550,7 @@ public class Bookie extends BookieCriticalThread {
     }
 
     void readJournal() throws IOException, BookieException {
+        long startTs = MathUtils.now();
         journal.replay(new JournalScanner() {
             @Override
             public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
@@ -590,6 +600,8 @@ public class Bookie extends BookieCriticalThread {
                 }
             }
         });
+        long elapsedTs = MathUtils.now() - startTs;
+        LOG.info("Finished replaying journal in {} ms.", elapsedTs);
     }
 
     @Override
@@ -632,9 +644,9 @@ public class Bookie extends BookieCriticalThread {
         // if setting it in bookie thread, the watcher might run before bookie thread.
         running = true;
         try {
-            registerBookie(conf);
-        } catch (IOException e) {
-            LOG.error("Couldn't register bookie with zookeeper, shutting down", e);
+            registerBookie(true).get();
+        } catch (Exception e) {
+            LOG.error("Couldn't register bookie with zookeeper, shutting down : ", e);
             shutdown(ExitCode.ZK_REG_FAIL);
         }
     }
@@ -800,19 +812,46 @@ public class Bookie extends BookieCriticalThread {
     /**
      * Register as an available bookie
      */
-    protected void registerBookie(ServerConfiguration conf) throws IOException {
+    protected Future<Void> registerBookie(final boolean throwException) {
+        return stateService.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException {
+                try {
+                    doRegisterBookie();
+                } catch (IOException ioe) {
+                    if (throwException) {
+                        throw ioe;
+                    } else {
+                        LOG.error("Couldn't register bookie with zookeeper, shutting down : ", ioe);
+                        triggerBookieShutdown(ExitCode.ZK_REG_FAIL);
+                    }
+                }
+                return (Void)null;
+            }
+        });
+    }
+
+    protected void doRegisterBookie() throws IOException {
+        doRegisterBookie(readOnly.get() ? zkBookieReadOnlyPath : zkBookieRegPath);
+    }
+
+    private void doRegisterBookie(final String regPath) throws IOException {
         if (null == zk) {
             // zookeeper instance is null, means not register itself to zk
             return;
         }
 
+        zkRegistered.set(false);
+
         // ZK ephemeral node for this Bookie.
         try{
-            if (!checkRegNodeAndWaitExpired(zkBookieRegPath)) {
+            if (!checkRegNodeAndWaitExpired(regPath)) {
                 // Create the ZK ephemeral node for this Bookie.
-                zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                zk.create(regPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
                         CreateMode.EPHEMERAL);
+                LOG.info("Registered myself in ZooKeeper at {}.", regPath);
             }
+            zkRegistered.set(true);
         } catch (KeeperException ke) {
             LOG.error("ZK exception registering ephemeral Znode for Bookie!", ke);
             // Throw an IOException back up. This will cause the Bookie
@@ -832,14 +871,31 @@ public class Bookie extends BookieCriticalThread {
     /**
      * Transition the bookie from readOnly mode to writable
      */
+    private Future<Void> transitionToWritableMode() {
+        return stateService.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                doTransitionToWritableMode();
+                return null;
+            }
+        });
+    }
+
     @VisibleForTesting
-    public void transitionToWritableMode() {
+    public void doTransitionToWritableMode() {
+        if (shuttingdown) {
+            return;
+        }
         if (!readOnly.compareAndSet(true, false)) {
             return;
         }
         LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
+        // change zookeeper state only when using zookeeper
+        if (null == zk) {
+            return;
+        }
         try {
-            this.registerBookie(conf);
+            doRegisterBookie(zkBookieRegPath);
         } catch (IOException e) {
             LOG.warn("Error in transitioning back to writable mode : ", e);
             transitionToReadOnlyMode();
@@ -863,12 +919,21 @@ public class Bookie extends BookieCriticalThread {
     /**
      * Transition the bookie to readOnly mode
      */
+    private Future<Void> transitionToReadOnlyMode() {
+        return stateService.submit(new Callable<Void>() {
+            @Override
+            public Void call() {
+                doTransitionToReadOnlyMode();
+                return (Void)null;
+            }
+        });
+    }
+
     @VisibleForTesting
-    public void transitionToReadOnlyMode() {
+    public void doTransitionToReadOnlyMode() {
         if (shuttingdown) {
             return;
         }
-
         if (!readOnly.compareAndSet(false, true)) {
             return;
         }
@@ -882,22 +947,20 @@ public class Bookie extends BookieCriticalThread {
         }
         LOG.info("Transitioning Bookie to ReadOnly mode,"
                 + " and will serve only read requests from clients!");
+        // change zookeeper state only when using zookeeper
+        if (null == zk) {
+            return;
+        }
         try {
-            if (null == zk.exists(this.bookieRegistrationPath
-                    + BookKeeperConstants.READONLY, false)) {
+            if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
                 try {
-                    zk.create(this.bookieRegistrationPath
-                            + BookKeeperConstants.READONLY, new byte[0],
-                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
+                              Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                 } catch (NodeExistsException e) {
                     // this node is just now created by someone.
                 }
             }
-            if (!checkRegNodeAndWaitExpired(zkBookieReadOnlyPath)) {
-                // Create the readonly node
-                zk.create(zkBookieReadOnlyPath,
-                        new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-            }
+            doRegisterBookie(zkBookieReadOnlyPath);
             try {
                 // Clear the current registered node
                 zk.delete(zkBookieRegPath, -1);
@@ -948,16 +1011,22 @@ public class Bookie extends BookieCriticalThread {
      *
      * @return zk client instance
      */
-    private ZooKeeper newZookeeper(ServerConfiguration conf) throws IOException, InterruptedException,
-            KeeperException {
+    private ZooKeeper newZookeeper(final ServerConfiguration conf)
+            throws IOException, InterruptedException, KeeperException {
         Set<Watcher> watchers = new HashSet<Watcher>();
         watchers.add(new Watcher() {
             @Override
             public void process(WatchedEvent event) {
+                if (!running) {
+                    // do nothing until first registration
+                    return;
+                }
                 // Check for expired connection.
-                if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
-                    LOG.error("ZK client connection to the ZK server has expired!");
-                    shutdown(ExitCode.ZK_EXPIRED);
+                if (event.getType().equals(EventType.None) &&
+                    event.getState().equals(KeeperState.Expired)) {
+                    zkRegistered.set(false);
+                    // schedule a re-register operation
+                    registerBookie(false);
                 }
             }
         });
@@ -965,8 +1034,8 @@ public class Bookie extends BookieCriticalThread {
                 .connectString(conf.getZkServers())
                 .sessionTimeoutMs(conf.getZkTimeout())
                 .watchers(watchers)
-                .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
-                        conf.getZkTimeout(), Integer.MAX_VALUE))
+                .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
+                        conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
                 .build();
     }
 
@@ -982,7 +1051,9 @@ public class Bookie extends BookieCriticalThread {
             journal.start();
             // wait until journal quits
             journal.join();
+            LOG.info("Journal thread quits.");
         } catch (InterruptedException ie) {
+            LOG.warn("Interrupted on running journal thread : ", ie);
         }
         // if the journal thread quits due to shutting down, it is ok
         if (!shuttingdown) {
@@ -1033,6 +1104,9 @@ public class Bookie extends BookieCriticalThread {
                 // mark bookie as in shutting down progress
                 shuttingdown = true;
 
+                // Shutdown the state service
+                stateService.shutdown();
+
                 // Shutdown journal
                 journal.shutdown();
                 this.join();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index d354fb3..8b42029 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -25,16 +25,10 @@ import java.io.IOException;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Implements a read only bookie.
  * 
@@ -55,66 +49,18 @@ public class ReadOnlyBookie extends Bookie {
             LOG.error(err);
             throw new IOException(err);
         }
-        LOG.info("successed call ReadOnlyBookie constructor");
-    }
-
-    /**
-     * Register as a read only bookie
-     */
-    @Override
-    protected void registerBookie(ServerConfiguration conf) throws IOException {
-        if (null == zk) {
-            // zookeeper instance is null, means not register itself to zk
-            return;
-        }
-
-        // ZK node for this ReadOnly Bookie.
-        try{
-            if (null == zk.exists(this.bookieRegistrationPath
-                        + BookKeeperConstants.READONLY, false)) {
-                try {
-                    zk.create(this.bookieRegistrationPath
-                            + BookKeeperConstants.READONLY + "/", new byte[0],
-                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                    LOG.debug("successed create ReadOnlyBookie parent zk node");
-                } catch (NodeExistsException e) {
-                    // this node is just now created by someone.
-                }
-            }
-
-            if (!checkRegNodeAndWaitExpired(zkBookieReadOnlyPath)) {
-                // Create the ZK node for this RO Bookie.
-                zk.create(zkBookieReadOnlyPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.EPHEMERAL);
-                LOG.debug("successed create ReadOnlyBookie zk node");
-            }
-        } catch (KeeperException ke) {
-            LOG.error("ZK exception registering Znode for ReadOnly Bookie!", ke);
-            // Throw an IOException back up. This will cause the Bookie
-            // constructor to error out. Alternatively, we could do a System
-            // exit here as this is a fatal error.
-            throw new IOException(ke);
-        } catch (InterruptedException ie) {
-            LOG.error("Interruptted exception registering Znode for ReadOnly Bookie!",
-                    ie);
-            // Throw an IOException back up. This will cause the Bookie
-            // constructor to error out. Alternatively, we could do a System
-            // exit here as this is a fatal error.
-            throw new IOException(ie);
-        }
+        LOG.info("Running bookie in readonly mode.");
     }
 
-    @VisibleForTesting
     @Override
-    public void transitionToWritableMode() {
+    public void doTransitionToWritableMode() {
+        // no-op
         LOG.info("Skip transition to writable mode for readonly bookie");
     }
 
-
-    @VisibleForTesting
     @Override
-    public void transitionToReadOnlyMode() {
-        LOG.warn("Skip transition to readonly mode for readonly bookie");
+    public void doTransitionToReadOnlyMode() {
+        // no-op
+        LOG.info("Skip transition to readonly mode for readonly bookie");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index cc2dda6..d4305ca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -84,6 +84,8 @@ public class ServerConfiguration extends AbstractConfiguration {
     // Zookeeper Parameters
     protected final static String ZK_TIMEOUT = "zkTimeout";
     protected final static String ZK_SERVERS = "zkServers";
+    protected final static String ZK_RETRY_BACKOFF_START_MS = "zkRetryBackoffStartMs";
+    protected final static String ZK_RETRY_BACKOFF_MAX_MS = "zkRetryBackoffMaxMs";
     protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
     //ReadOnly mode support on all disk full
     protected final static String READ_ONLY_MODE_ENABLED = "readOnlyModeEnabled";
@@ -658,6 +660,48 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get zookeeper client backoff retry start time in millis.
+     *
+     * @return zk backoff retry start time in millis.
+     */
+    public int getZkRetryBackoffStartMs() {
+        return getInt(ZK_RETRY_BACKOFF_START_MS, getZkTimeout());
+    }
+
+    /**
+     * Set zookeeper client backoff retry start time in millis.
+     *
+     * @param retryMs
+     *          backoff retry start time in millis.
+     * @return server configuration.
+     */
+    public ServerConfiguration setZkRetryBackoffStartMs(int retryMs) {
+        setProperty(ZK_RETRY_BACKOFF_START_MS, retryMs);
+        return this;
+    }
+
+    /**
+     * Get zookeeper client backoff retry max time in millis.
+     *
+     * @return zk backoff retry max time in millis.
+     */
+    public int getZkRetryBackoffMaxMs() {
+        return getInt(ZK_RETRY_BACKOFF_MAX_MS, getZkTimeout());
+    }
+
+    /**
+     * Set zookeeper client backoff retry max time in millis.
+     *
+     * @param retryMs
+     *          backoff retry max time in millis.
+     * @return server configuration.
+     */
+    public ServerConfiguration setZkRetryBackoffMaxMs(int retryMs) {
+        setProperty(ZK_RETRY_BACKOFF_MAX_MS, retryMs);
+        return this;
+    }
+
+    /**
      * Is statistics enabled
      *
      * @return is statistics enabled

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 5db8aad..39dfac6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -51,7 +51,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
             .getLogger(BookieInitializationTest.class);
 
     ZooKeeper newzk = null;
-    
+
     public BookieInitializationTest() {
         super(0);
     }
@@ -71,7 +71,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         }
 
         void testRegisterBookie(ServerConfiguration conf) throws IOException {
-            super.registerBookie(conf);
+            super.doRegisterBookie();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
new file mode 100644
index 0000000..9558ddc
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBookieWatcher extends BookKeeperClusterTestCase {
+
+    public TestBookieWatcher() {
+        super(2);
+    }
+
+    private void expireZooKeeperSession(ZooKeeper zk, int timeout)
+            throws IOException, InterruptedException, KeeperException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        ZooKeeper newZk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout,
+                new Watcher() {
+
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.None &&
+                        event.getState() == KeeperState.SyncConnected) {
+                    latch.countDown();
+                }
+            }
+
+        }, zk.getSessionId(), zk.getSessionPasswd());
+        if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
+            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+        }
+        newZk.close();
+    }
+
+    @Test(timeout=10000)
+    public void testBookieWatcherSurviveWhenSessionExpired() throws Exception {
+        final int timeout = 2000;
+        ZooKeeper zk = ZooKeeperClient.newBuilder()
+                .connectString(zkUtil.getZooKeeperConnectString())
+                .sessionTimeoutMs(timeout)
+                .build();
+        try {
+            runBookieWatcherWhenSessionExpired(zk, timeout, true);
+        } finally {
+            zk.close();
+        }
+    }
+
+    @Test(timeout=10000)
+    public void testBookieWatcherDieWhenSessionExpired() throws Exception {
+        final int timeout = 2000;
+        final CountDownLatch connectLatch = new CountDownLatch(1);
+        ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout, new Watcher() {
+            @Override
+            public void process(WatchedEvent watchedEvent) {
+                if (EventType.None == watchedEvent.getType() &&
+                        KeeperState.SyncConnected == watchedEvent.getState()) {
+                    connectLatch.countDown();
+                }
+            }
+        });
+        connectLatch.await();
+        try {
+            runBookieWatcherWhenSessionExpired(zk, timeout, false);
+        } finally {
+            zk.close();
+        }
+    }
+
+    private void runBookieWatcherWhenSessionExpired(ZooKeeper zk, int timeout, boolean reconnectable)
+            throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        BookKeeper bkc = new BookKeeper(conf, zk);
+
+        LedgerHandle lh;
+        try {
+            lh = bkc.createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {});
+            fail("Should fail to create ledger due to not enough bookies.");
+        } catch (BKException bke) {
+            // expected
+        }
+
+        // make zookeeper session expired
+        expireZooKeeperSession(bkc.zk, timeout);
+        TimeUnit.MILLISECONDS.sleep(3 * timeout);
+
+        // start four new bookies
+        for (int i=0; i<2; i++) {
+            startNewBookie();
+        }
+
+        // wait for bookie watcher backoff time.
+        TimeUnit.SECONDS.sleep(1);
+
+        // should success to detect newly added bookies
+        try {
+            lh = bkc.createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {});
+            lh.close();
+            if (!reconnectable) {
+                fail("Should fail to create ledger due to bookie watcher could not survive after session expire.");
+            }
+        } catch (BKException bke) {
+            if (reconnectable) {
+                fail("Should not fail to create ledger due to bookie watcher could survive after session expire.");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index b1a53e5..692ddce 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -272,7 +272,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase {
         ServerConfiguration bookieConf = bsConfs.get(2);
         BookieServer bk = bs.get(2);
         bookieConf.setReadOnlyModeEnabled(true);
-        bk.getBookie().transitionToReadOnlyMode();
+        bk.getBookie().doTransitionToReadOnlyMode();
 
         // grace period for publishing the bk-ledger
         LOG.debug("Waiting for Auditor to finish ledger check.");

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 0490deb..9591ef8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -530,7 +530,7 @@ public class TestReplicationWorker extends MultiLedgerManagerTestCase {
         try {
             BookieServer newBk = bs.get(bs.size() - 1);
             bsConfs.get(bsConfs.size() - 1).setReadOnlyModeEnabled(true);
-            newBk.getBookie().transitionToReadOnlyMode();
+            newBk.getBookie().doTransitionToReadOnlyMode();
             underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
             while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath) && rw.isRunning()) {
                 Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
index a3bd4d6..573bc15 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
@@ -28,6 +28,8 @@ import org.junit.After;
 import static org.junit.Assert.*;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
+
+import java.net.InetAddress;
 import java.util.HashSet;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -89,16 +91,9 @@ public class BookieZKExpireTest extends BookKeeperClusterTestCase {
             sendthread.resume();
 
             // allow watcher thread to run
-            secondsToWait = 20;
-            while (server.isBookieRunning()
-                   || server.isRunning()) {
-                Thread.sleep(1000);
-                if (secondsToWait-- <= 0) {
-                    break;
-                }
-            }
-            assertFalse("Bookie should have shutdown on losing zk session", server.isBookieRunning());
-            assertFalse("Bookie Server should have shutdown on losing zk session", server.isRunning());
+            Thread.sleep(3000);
+            assertTrue("Bookie should not shutdown on losing zk session", server.isBookieRunning());
+            assertTrue("Bookie Server should not shutdown on losing zk session", server.isRunning());
         } finally {
             server.shutdown();
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 771a8a1..124a420 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -248,7 +248,7 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
         killBookie(1);
         baseConf.setReadOnlyModeEnabled(true);
         startNewBookie();
-        bs.get(1).getBookie().transitionToReadOnlyMode();
+        bs.get(1).getBookie().doTransitionToReadOnlyMode();
         try {
             bkc.readBookiesBlocking();
             bkc.createLedger(2, 2, DigestType.CRC32, "".getBytes());

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
index 0c23aaf..d829db5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
@@ -106,8 +106,12 @@ public class TestZooKeeperClient extends TestCase {
     class ShutdownZkServerClient extends ZooKeeperClient {
 
         ShutdownZkServerClient(String connectString, int sessionTimeoutMs,
-                ZooKeeperWatcherBase watcher, RetryPolicy operationRetryPolicy)throws IOException {
-            super(connectString, sessionTimeoutMs, watcher, operationRetryPolicy, null, NullStatsLogger.INSTANCE, 1, 0);
+                ZooKeeperWatcherBase watcher, RetryPolicy operationRetryPolicy)
+                throws IOException {
+            super(connectString, sessionTimeoutMs, watcher,
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
+                    operationRetryPolicy,
+                    NullStatsLogger.INSTANCE, 1, 0);
         }
 
         @Override