You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/08/21 13:19:22 UTC

svn commit: r1516139 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookkeeper-server/src/test/java/org/apache/bookkeeper/replication...

Author: ivank
Date: Wed Aug 21 11:19:21 2013
New Revision: 1516139

URL: http://svn.apache.org/r1516139
Log:
BOOKKEEPER-632: AutoRecovery should consider read only bookies (vinay via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesListener.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Aug 21 11:19:21 2013
@@ -92,6 +92,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-624: Reduce logs generated by ReplicationWorker (vinay via ivank)
 
+        BOOKKEEPER-632: AutoRecovery should consider read only bookies (vinay via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Wed Aug 21 11:19:21 2013
@@ -162,15 +162,35 @@ public class BookKeeperAdmin {
     /**
      * Get a list of the available bookies.
      *
-     * @return the collection of available bookies
+     * @return a collection of bookie addresses
      */
     public Collection<InetSocketAddress> getAvailableBookies()
-            throws InterruptedException, KeeperException {
-        bkc.bookieWatcher.readBookiesBlocking();
+            throws BKException {
         return bkc.bookieWatcher.getBookies();
     }
 
     /**
+     * Get a list of readonly bookies
+     *
+     * @return a collection of bookie addresses
+     */
+    public Collection<InetSocketAddress> getReadOnlyBookies() {
+        return bkc.bookieWatcher.getReadOnlyBookies();
+    }
+
+    /**
+     * Notify when the available list of bookies changes.
+     * This is a one-shot notification. To receive subsequent notifications
+     * the listener must be registered again.
+     *
+     * @param listener the listener to notify
+     */
+    public void notifyBookiesChanged(final BookiesListener listener)
+            throws BKException {
+        bkc.bookieWatcher.notifyBookiesChanged(listener);
+    }
+
+    /**
      * Open a ledger as an administrator. This means that no digest password
      * checks are done. Otherwise, the call is identical to BookKeeper#asyncOpenLedger
      *

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Wed Aug 21 11:19:21 2013
@@ -44,6 +44,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.ZooDefs.Ids;
 
 /**
@@ -86,16 +87,46 @@ class BookieWatcher implements Watcher, 
         readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
     }
 
-    public Collection<InetSocketAddress> getBookies() {
+    void notifyBookiesChanged(final BookiesListener listener) throws BKException {
+        try {
+            bk.getZkHandle().getChildren(this.bookieRegistrationPath,
+                    new Watcher() {
+                        public void process(WatchedEvent event) {
+                            // listen children changed event from ZooKeeper
+                            if (event.getType() == EventType.NodeChildrenChanged) {
+                                listener.availableBookiesChanged();
+                            }
+                        }
+                    });
+        } catch (KeeperException ke) {
+            logger.error("Error registering watcher with zookeeper", ke);
+            throw new BKException.ZKException();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            logger.error("Interrupted registering watcher with zookeeper", ie);
+            throw new BKException.BKInterruptedException();
+        }
+    }
+
+    public Collection<InetSocketAddress> getBookies() throws BKException {
         try {
             List<String> children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false);
+            children.remove(BookKeeperConstants.READONLY);
             return convertToBookieAddresses(children);
-        } catch (Exception e) {
-            logger.error("Failed to get bookies : ", e);
-            return new HashSet<InetSocketAddress>();
+        } catch (KeeperException ke) {
+            logger.error("Failed to get bookie list : ", ke);
+            throw new BKException.ZKException();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            logger.error("Interrupted reading bookie list", ie);
+            throw new BKException.BKInterruptedException();
         }
     }
 
+    Collection<InetSocketAddress> getReadOnlyBookies() {
+        return new HashSet<InetSocketAddress>(readOnlyBookieWatcher.getReadOnlyBookies());
+    }
+
     public void readBookies() {
         readBookies(this);
     }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesListener.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesListener.java?rev=1516139&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesListener.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesListener.java Wed Aug 21 11:19:21 2013
@@ -0,0 +1,27 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Listener for the the available bookies changes.
+ */
+public interface BookiesListener {
+    void availableBookiesChanged();
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Wed Aug 21 11:19:21 2013
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.client.Ledg
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
+import org.apache.bookkeeper.client.BookiesListener;
 import org.apache.bookkeeper.util.StringUtils;
 
 import org.apache.bookkeeper.util.ZkUtils;
@@ -66,7 +67,6 @@ import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,10 +78,11 @@ import org.slf4j.LoggerFactory;
  * re-replication activities by keeping all the corresponding ledgers of the
  * failed bookie as underreplicated znode in zk.
  */
-public class Auditor implements Watcher {
+public class Auditor implements BookiesListener {
     private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
     private final ServerConfiguration conf;
-    private final ZooKeeper zkc;
+    private BookKeeper bkc;
+    private BookKeeperAdmin admin;
     private BookieLedgerIndexer bookieLedgerIndexer;
     private LedgerManager ledgerManager;
     private LedgerUnderreplicationManager ledgerUnderreplicationManager;
@@ -91,7 +92,6 @@ public class Auditor implements Watcher 
     public Auditor(final String bookieIdentifier, ServerConfiguration conf,
                    ZooKeeper zkc) throws UnavailableException {
         this.conf = conf;
-        this.zkc = zkc;
         initialize(conf, zkc);
 
         executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@@ -115,6 +115,9 @@ public class Auditor implements Watcher 
             this.ledgerUnderreplicationManager = ledgerManagerFactory
                     .newLedgerUnderreplicationManager();
 
+            this.bkc = new BookKeeper(new ClientConfiguration(conf), zkc);
+            this.admin = new BookKeeperAdmin(bkc);
+
         } catch (CompatibilityException ce) {
             throw new UnavailableException(
                     "CompatibilityException while initializing Auditor", ce);
@@ -174,6 +177,8 @@ public class Auditor implements Watcher 
                                 Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
                                 handleLostBookies(lostBookies, ledgerDetails);
                             }
+                        } catch (BKException bke) {
+                            LOG.error("Exception getting bookie list", bke);
                         } catch (KeeperException ke) {
                             LOG.error("Exception while watching available bookies", ke);
                         } catch (InterruptedException ie) {
@@ -238,6 +243,9 @@ public class Auditor implements Watcher 
                         try {
                             knownBookies = getAvailableBookies();
                             auditingBookies(knownBookies);
+                        } catch (BKException bke) {
+                            LOG.error("Exception getting bookie list", bke);
+                            submitShutdownTask();
                         } catch (KeeperException ke) {
                             LOG.error("Exception while watching available bookies", ke);
                             submitShutdownTask();
@@ -262,10 +270,21 @@ public class Auditor implements Watcher 
             cb.await();
         }
     }
-    
-    private List<String> getAvailableBookies() throws KeeperException,
-            InterruptedException {
-        return zkc.getChildren(conf.getZkAvailableBookiesPath(), this);
+
+    private List<String> getAvailableBookies() throws BKException {
+        // Get the available bookies, also watch for further changes
+        // Watching on only available bookies is sufficient, as changes in readonly bookies also changes in available
+        // bookies
+        admin.notifyBookiesChanged(this);
+        Collection<InetSocketAddress> availableBkAddresses = admin.getAvailableBookies();
+        Collection<InetSocketAddress> readOnlyBkAddresses = admin.getReadOnlyBookies();
+        availableBkAddresses.addAll(readOnlyBkAddresses);
+
+        List<String> availableBookies = new ArrayList<String>();
+        for (InetSocketAddress addr : availableBkAddresses) {
+            availableBookies.add(StringUtils.addrToString(addr));
+        }
+        return availableBookies;
     }
 
     @SuppressWarnings("unchecked")
@@ -471,14 +490,8 @@ public class Auditor implements Watcher 
     }
 
     @Override
-    public void process(WatchedEvent event) {
-        // listen children changed event from ZooKeeper
-        if (event.getState() == KeeperState.Disconnected
-                || event.getState() == KeeperState.Expired) {
-            submitShutdownTask();
-        } else if (event.getType() == EventType.NodeChildrenChanged) {
-            submitAuditTask();
-        }
+    public void availableBookiesChanged() {
+        submitAuditTask();
     }
 
     /**
@@ -493,9 +506,13 @@ public class Auditor implements Watcher 
                 LOG.warn("Executor not shutting down, interrupting");
                 executor.shutdownNow();
             }
+            admin.close();
+            bkc.close();
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             LOG.warn("Interrupted while shutting down auditor bookie", ie);
+        } catch (BKException bke) {
+            LOG.warn("Exception while shutting down auditor bookie", bke);
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java Wed Aug 21 11:19:21 2013
@@ -163,8 +163,7 @@ public class AuditorElector {
     private class ElectionWatcher implements Watcher {
         @Override
         public void process(WatchedEvent event) {
-            if (event.getState() == KeeperState.Disconnected
-                || event.getState() == KeeperState.Expired) {
+            if (event.getState() == KeeperState.Expired) {
                 LOG.error("Lost ZK connection, shutting down");
                 submitShutdownTask();
             } else if (event.getType() == EventType.NodeDeleted) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java Wed Aug 21 11:19:21 2013
@@ -53,7 +53,7 @@ public class AutoRecoveryMain {
             .getLogger(AutoRecoveryMain.class);
 
     private ServerConfiguration conf;
-    private ZooKeeper zk;
+    ZooKeeper zk;
     AuditorElector auditorElector;
     ReplicationWorker replicationWorker;
     private AutoRecoveryDeathWatcher deathWatcher;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Wed Aug 21 11:19:21 2013
@@ -189,6 +189,9 @@ public class ReplicationWorker implement
             } catch (BKException.BKLedgerRecoveryException e) {
                 LOG.warn("BKLedgerRecoveryException "
                         + "while replicating the fragment", e);
+                if (admin.getReadOnlyBookies().contains(targetBookie)) {
+                    throw new BKException.BKWriteOnReadOnlyBookieException();
+                }
             }
         }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java Wed Aug 21 11:19:21 2013
@@ -207,25 +207,6 @@ public class AuditorBookieTest extends B
                 .getPort());
     }
 
-    /**
-     * Test that, if an auditor looses its ZK connection/session
-     * it will shutdown.
-     */
-    @Test(timeout=60000)
-    public void testAuditorZKSessionLoss() throws Exception {
-        stopZKCluster();
-        for (AuditorElector e : auditorElectors.values()) {
-            for (int i = 0; i < 10; i++) { // give it 10 seconds to shutdown
-                if (!e.isRunning()) {
-                    break;
-                }
-
-                Thread.sleep(1000);
-            }
-            assertFalse("AuditorElector should have shutdown", e.isRunning());
-        }
-    }
-
     private void startAuditorElector(String addr) throws Exception {
         ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
         ZooKeeper zk = ZkUtils.createConnectedZookeeperClient(

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java Wed Aug 21 11:19:21 2013
@@ -299,6 +299,29 @@ public class AuditorLedgerCheckerTest ex
         }
     }
 
+    /**
+     * Test Auditor should consider Readonly bookie as available bookie. Should not publish ur ledgers for
+     * readonly bookies.
+     */
+    @Test(timeout = 20000)
+    public void testReadOnlyBookieExclusionFromURLedgersCheck() throws Exception {
+        LedgerHandle lh = createAndAddEntriesToLedger();
+        ledgerList.add(lh.getId());
+        LOG.debug("Created following ledgers : " + ledgerList);
+
+        int count = ledgerList.size();
+        final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(count);
+
+        ServerConfiguration bookieConf = bsConfs.get(2);
+        BookieServer bk = bs.get(2);
+        bookieConf.setReadOnlyModeEnabled(true);
+        bk.getBookie().transitionToReadOnlyMode();
+
+        // grace period for publishing the bk-ledger
+        LOG.debug("Waiting for Auditor to finish ledger check.");
+        assertFalse("latch should not have completed", underReplicaLatch.await(5, TimeUnit.SECONDS));
+    }
+
     private CountDownLatch registerUrLedgerWatcher(int count)
             throws KeeperException, InterruptedException {
         final CountDownLatch underReplicaLatch = new CountDownLatch(count);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java Wed Aug 21 11:19:21 2013
@@ -20,8 +20,11 @@
  */
 package org.apache.bookkeeper.replication;
 
+import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 
+import org.junit.Test;
+
 /*
  * Test the AuditorPeer
  */
@@ -34,6 +37,7 @@ public class AutoRecoveryMainTest extend
     /*
      * test the startup of the auditorElector and RW.
      */
+    @Test(timeout=60000)
     public void testStartup() throws Exception {
         AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
         try {
@@ -51,6 +55,7 @@ public class AutoRecoveryMainTest extend
     /*
      * Test the shutdown of all daemons
      */
+    @Test(timeout=60000)
     public void testShutdown() throws Exception {
         AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
         main.start();
@@ -66,4 +71,38 @@ public class AutoRecoveryMainTest extend
         assertFalse("Replication worker should not be running",
                 main.replicationWorker.isRunning());
     }
+
+    /**
+     * Test that, if an autorecovery looses its ZK connection/session
+     * it will shutdown.
+     */
+    @Test(timeout=60000)
+    public void testAutoRecoverySessionLoss() throws Exception {
+        AutoRecoveryMain main1 = new AutoRecoveryMain(bsConfs.get(0));
+        AutoRecoveryMain main2 = new AutoRecoveryMain(bsConfs.get(1));
+        main1.start();
+        main2.start();
+        Thread.sleep(500);
+        assertTrue("AuditorElectors should be running",
+                main1.auditorElector.isRunning() && main2.auditorElector.isRunning());
+        assertTrue("Replication workers should be running",
+                main1.replicationWorker.isRunning() && main2.replicationWorker.isRunning());
+
+        zkUtil.expireSession(main1.zk);
+        zkUtil.expireSession(main2.zk);
+
+        for (int i = 0; i < 10; i++) { // give it 10 seconds to shutdown
+            if (!main1.auditorElector.isRunning()
+                && !main2.auditorElector.isRunning()
+                && !main1.replicationWorker.isRunning()
+                && !main2.replicationWorker.isRunning()) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        assertFalse("Elector1 should have shutdown", main1.auditorElector.isRunning());
+        assertFalse("Elector2 should have shutdown", main2.auditorElector.isRunning());
+        assertFalse("RW1 should have shutdown", main1.replicationWorker.isRunning());
+        assertFalse("RW2 should have shutdown", main2.replicationWorker.isRunning());
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Wed Aug 21 11:19:21 2013
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.client.Ledg
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -496,6 +497,46 @@ public class TestReplicationWorker exten
     }
 
     /**
+     * Test that if the local bookie turns out to be readonly, then no point in running RW. So RW should shutdown.
+     */
+    @Test(timeout = 20000)
+    public void testRWShutdownOnLocalBookieReadonlyTransition() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
+
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(data);
+        }
+        InetSocketAddress replicaToKill = LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
+
+        LOG.info("Killing Bookie", replicaToKill);
+        killBookie(replicaToKill);
+
+        int newBkPort = startNewBookie();
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(data);
+        }
+
+        InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBkPort);
+        LOG.info("New Bookie addr :" + newBkAddr);
+
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+
+        rw.start();
+        try {
+            BookieServer newBk = bs.get(bs.size() - 1);
+            bsConfs.get(bsConfs.size() - 1).setReadOnlyModeEnabled(true);
+            newBk.getBookie().transitionToReadOnlyMode();
+            underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
+            while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath) && rw.isRunning()) {
+                Thread.sleep(100);
+            }
+            assertFalse("RW should shutdown if the bookie is readonly", rw.isRunning());
+        } finally {
+            rw.shutdown();
+        }
+    }
+
+    /**
      * Test that the replication worker will shutdown if it lose its zookeeper session
      */
     @Test(timeout=30000)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1516139&r1=1516138&r2=1516139&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Wed Aug 21 11:19:21 2013
@@ -127,6 +127,16 @@ public class ZooKeeperUtil {
         throw new IOException("ZooKeeper thread not found");
     }
 
+    public void expireSession(ZooKeeper zk) throws Exception {
+        long id = zk.getSessionId();
+        byte[] password = zk.getSessionPasswd();
+        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
+        ZooKeeper zk2 = new ZooKeeper(getZooKeeperConnectString(),
+                zk.getSessionTimeout(), w, id, password);
+        w.waitForConnection();
+        zk2.close();
+    }
+
     public void killServer() throws Exception {
         if (zkc != null) {
             zkc.close();