You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/01/21 17:26:23 UTC

svn commit: r1560066 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/

Author: ivank
Date: Tue Jan 21 16:26:23 2014
New Revision: 1560066

URL: http://svn.apache.org/r1560066
Log:
BOOKKEEPER-661: Turn readonly back to writable if spaces are reclaimed. (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Jan 21 16:26:23 2014
@@ -144,6 +144,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-696: stats collection on bookkeeper client (Aniruddha, ivank via sijie)
 
+        BOOKKEEPER-661: Turn readonly back to writable if spaces are reclaimed. (sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Jan 21 16:26:23 2014
@@ -65,6 +65,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,7 +97,7 @@ public class Bookie extends BookieCritic
     // ZK registration path for this bookie
     private final String bookieRegistrationPath;
 
-    private LedgerDirsManager ledgerDirsManager;
+    private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
 
     // ZooKeeper client instance for the Bookie
@@ -116,6 +117,7 @@ public class Bookie extends BookieCritic
     final ConcurrentMap<Long, byte[]> masterKeyCache = new ConcurrentHashMap<Long, byte[]>();
 
     final private String zkBookieRegPath;
+    final private String zkBookieReadOnlyPath;
 
     final private AtomicBoolean readOnly = new AtomicBoolean(false);
 
@@ -244,6 +246,7 @@ public class Bookie extends BookieCritic
 
             final AtomicBoolean oldDataExists = new AtomicBoolean(false);
             parent.list(new FilenameFilter() {
+                    @Override
                     public boolean accept(File dir, String name) {
                         if (name.endsWith(".txn") || name.endsWith(".idx") || name.endsWith(".log")) {
                             oldDataExists.set(true);
@@ -443,7 +446,9 @@ public class Bookie extends BookieCritic
         handles = new HandleFactoryImpl(ledgerStorage);
 
         // ZK ephemeral node for this Bookie.
-        zkBookieRegPath = this.bookieRegistrationPath + getMyId();
+        String myID = getMyId();
+        zkBookieRegPath = this.bookieRegistrationPath + myID;
+        zkBookieReadOnlyPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY + "/" + myID;
     }
 
     private String getMyId() throws UnknownHostException {
@@ -502,6 +507,7 @@ public class Bookie extends BookieCritic
         });
     }
 
+    @Override
     synchronized public void start() {
         setDaemon(true);
         LOG.debug("I'm starting a bookie with journal directory {}", journalDirectory.getName());
@@ -582,6 +588,18 @@ public class Bookie extends BookieCritic
                 LOG.error("Fatal error reported by ledgerDirsManager");
                 triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
             }
+
+            @Override
+            public void diskWritable(File disk) {
+                // Transition to writable mode when a disk becomes writable again.
+                transitionToWritableMode();
+            }
+
+            @Override
+            public void diskJustWritable(File disk) {
+                // Transition to writable mode when a disk becomes writable again.
+                transitionToWritableMode();
+            }
         };
     }
 
@@ -645,6 +663,56 @@ public class Bookie extends BookieCritic
     }
 
     /**
+     * Check existence of <i>regPath</i> and wait it expired if possible
+     *
+     * @param regPath
+     *          reg node path.
+     * @return true if regPath exists, otherwise return false
+     * @throws IOException if can't create reg path
+     */
+    protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
+        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+        Watcher zkPrevRegNodewatcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                // Check for prev znode deletion. Connection expiration is
+                // not handling, since bookie has logic to shutdown.
+                if (EventType.NodeDeleted == event.getType()) {
+                    prevNodeLatch.countDown();
+                }
+            }
+        };
+        try {
+            Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
+            if (null != stat) {
+                // if the ephemeral owner isn't current zookeeper client
+                // wait for it to be expired.
+                if (stat.getEphemeralOwner() != zk.getSessionId()) {
+                    LOG.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+                            + " {} ms for znode deletion", regPath, conf.getZkTimeout());
+                    // waiting for the previous bookie reg znode deletion
+                    if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
+                        throw new NodeExistsException(regPath);
+                    } else {
+                        return false;
+                    }
+                }
+                return true;
+            } else {
+                return false;
+            }
+        } catch (KeeperException ke) {
+            LOG.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
+            throw new IOException("ZK exception checking and wait ephemeral znode "
+                    + regPath + " expired", ke);
+        } catch (InterruptedException ie) {
+            LOG.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
+            throw new IOException("Interrupted checking and wait ephemeral znode "
+                    + regPath + " expired", ie);
+        }
+    }
+
+    /**
      * Register as an available bookie
      */
     protected void registerBookie(ServerConfiguration conf) throws IOException {
@@ -654,39 +722,14 @@ public class Bookie extends BookieCritic
         }
 
         // ZK ephemeral node for this Bookie.
-        String zkBookieRegPath = this.bookieRegistrationPath
-            + StringUtils.addrToString(getBookieAddress(conf));
-        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
         try{
-            Watcher zkPrevRegNodewatcher = new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                    // Check for prev znode deletion. Connection expiration is
-                    // not handling, since bookie has logic to shutdown.
-                    if (EventType.NodeDeleted == event.getType()) {
-                        prevNodeLatch.countDown();
-                    }
-                }
-            };
-            if (null != zk.exists(zkBookieRegPath, zkPrevRegNodewatcher)) {
-                LOG.info("Previous bookie registration znode: "
-                        + zkBookieRegPath
-                        + " exists, so waiting zk sessiontimeout: "
-                        + conf.getZkTimeout() + "ms for znode deletion");
-                // waiting for the previous bookie reg znode deletion
-                if (!prevNodeLatch.await(conf.getZkTimeout(),
-                        TimeUnit.MILLISECONDS)) {
-                    throw new KeeperException.NodeExistsException(
-                            zkBookieRegPath);
-                }
+            if (!checkRegNodeAndWaitExpired(zkBookieRegPath)) {
+                // Create the ZK ephemeral node for this Bookie.
+                zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.EPHEMERAL);
             }
-
-            // Create the ZK ephemeral node for this Bookie.
-            zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL);
         } catch (KeeperException ke) {
-            LOG.error("ZK exception registering ephemeral Znode for Bookie!",
-                    ke);
+            LOG.error("ZK exception registering ephemeral Znode for Bookie!", ke);
             // Throw an IOException back up. This will cause the Bookie
             // constructor to error out. Alternatively, we could do a System
             // exit here as this is a fatal error.
@@ -701,12 +744,43 @@ public class Bookie extends BookieCritic
         }
     }
 
-    /*
+    /**
+     * Transition the bookie from readOnly mode to writable
+     */
+    @VisibleForTesting
+    public void transitionToWritableMode() {
+        if (!readOnly.compareAndSet(true, false)) {
+            return;
+        }
+        LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
+        try {
+            this.registerBookie(conf);
+        } catch (IOException e) {
+            LOG.warn("Error in transitioning back to writable mode : ", e);
+            transitionToReadOnlyMode();
+            return;
+        }
+        // clear the readonly state
+        try {
+            zk.delete(zkBookieReadOnlyPath, -1);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.warn("Interrupted clearing readonly state while transitioning to writable mode : ", e);
+            return;
+        } catch (KeeperException e) {
+            // if we failed when deleting the readonly flag in zookeeper, it is OK since client would
+            // already see the bookie in writable list. so just log the exception
+            LOG.warn("Failed to delete bookie readonly state in zookeeper : ", e);
+            return;
+        }
+    }
+
+    /**
      * Transition the bookie to readOnly mode
      */
     @VisibleForTesting
     public void transitionToReadOnlyMode() {
-        if (shuttingdown == true) {
+        if (shuttingdown) {
             return;
         }
 
@@ -734,12 +808,18 @@ public class Bookie extends BookieCritic
                     // this node is just now created by someone.
                 }
             }
-            // Create the readonly node
-            zk.create(this.bookieRegistrationPath
-                    + BookKeeperConstants.READONLY + "/" + getMyId(),
-                    new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-            // Clear the current registered node
-            zk.delete(zkBookieRegPath, -1);
+            if (!checkRegNodeAndWaitExpired(zkBookieReadOnlyPath)) {
+                // Create the readonly node
+                zk.create(zkBookieReadOnlyPath,
+                        new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            }
+            try {
+                // Clear the current registered node
+                zk.delete(zkBookieRegPath, -1);
+            } catch (KeeperException.NoNodeException nne) {
+                LOG.warn("No writable bookie registered node {} when transitioning to readonly",
+                        zkBookieRegPath, nne);
+            }
         } catch (IOException e) {
             LOG.error("Error in transition to ReadOnly Mode."
                     + " Shutting down", e);
@@ -837,6 +917,7 @@ public class Bookie extends BookieCritic
         LOG.info("Triggering shutdown of Bookie-{} with exitCode {}",
                  conf.getBookiePort(), exitCode);
         BookieThread th = new BookieThread("BookieShutdownTrigger") {
+            @Override
             public void run() {
                 Bookie.this.shutdown(exitCode);
             }
@@ -897,14 +978,14 @@ public class Bookie extends BookieCritic
         return this.exitCode;
     }
 
-    /** 
+    /**
      * Retrieve the ledger descriptor for the ledger which entry should be added to.
-     * The LedgerDescriptor returned from this method should be eventually freed with 
+     * The LedgerDescriptor returned from this method should be eventually freed with
      * #putHandle().
      *
      * @throws BookieException if masterKey does not match the master key of the ledger
      */
-    private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey) 
+    private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey)
             throws IOException, BookieException {
         long ledgerId = entry.getLong();
         LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
@@ -932,7 +1013,7 @@ public class Bookie extends BookieCritic
     }
 
     /**
-     * Add an entry to a ledger as specified by handle. 
+     * Add an entry to a ledger as specified by handle.
      */
     private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, WriteCallback cb, Object ctx)
             throws IOException, BookieException {
@@ -947,11 +1028,11 @@ public class Bookie extends BookieCritic
 
     /**
      * Add entry to a ledger, even if the ledger has previous been fenced. This should only
-     * happen in bookie recovery or ledger recovery cases, where entries are being replicates 
+     * happen in bookie recovery or ledger recovery cases, where entries are being replicates
      * so that they exist on a quorum of bookies. The corresponding client side call for this
      * is not exposed to users.
      */
-    public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) 
+    public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
         try {
             LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
@@ -963,8 +1044,8 @@ public class Bookie extends BookieCritic
             throw new IOException(e);
         }
     }
-    
-    /** 
+
+    /**
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
@@ -1026,6 +1107,7 @@ public class Bookie extends BookieCritic
     static class CounterCallback implements WriteCallback {
         int count;
 
+        @Override
         synchronized public void writeComplete(int rc, long l, long e, InetSocketAddress addr, Object ctx) {
             count--;
             if (count == 0) {
@@ -1046,7 +1128,7 @@ public class Bookie extends BookieCritic
 
     /**
      * Format the bookie server data
-     * 
+     *
      * @param conf
      *            ServerConfiguration
      * @param isInteractive
@@ -1135,7 +1217,7 @@ public class Bookie extends BookieCritic
      * @throws IOException
      * @throws InterruptedException
      */
-    public static void main(String[] args) 
+    public static void main(String[] args)
             throws IOException, InterruptedException, BookieException, KeeperException {
         Bookie b = new Bookie(new ServerConfiguration());
         b.start();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Jan 21 16:26:23 2014
@@ -42,8 +42,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -62,8 +62,8 @@ public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
 
     volatile File currentDir;
-    private LedgerDirsManager ledgerDirsManager;
-    private AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
+    private final LedgerDirsManager ledgerDirsManager;
+    private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
     private long logId;
     private volatile long leastUnflushedLogId;
@@ -233,6 +233,16 @@ public class EntryLogger {
             public void fatalError() {
                 // Nothing to handle here. Will be handled in Bookie
             }
+
+            @Override
+            public void diskWritable(File disk) {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void diskJustWritable(File disk) {
+                // Nothing to handle here. Will be handled in Bookie
+            }
         };
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Jan 21 16:26:23 2014
@@ -276,13 +276,20 @@ public class GarbageCollectorThread exte
         lastMinorCompactionTime = lastMajorCompactionTime = MathUtils.now();
     }
 
-    synchronized void forceGC() {
+    public synchronized void enableForceGC() {
         if (forceGarbageCollection.compareAndSet(false, true)) {
             LOG.info("Forced garbage collection triggered by thread: {}", Thread.currentThread().getName());
             notify();
         }
     }
 
+    public void disableForceGC() {
+        if (forceGarbageCollection.compareAndSet(true, false)) {
+            LOG.info("{} disabled force garbage collection since bookie has enough space now.", Thread
+                    .currentThread().getName());
+        }
+    }
+
     @Override
     public void run() {
         while (running) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java Tue Jan 21 16:26:23 2014
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
@@ -367,6 +368,16 @@ public class IndexPersistenceMgr {
             public void fatalError() {
                 // Nothing to handle here. Will be handled in Bookie
             }
+
+            @Override
+            public void diskWritable(File disk) {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void diskJustWritable(File disk) {
+                // Nothing to handle here. Will be handled in Bookie
+            }
         };
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Tue Jan 21 16:26:23 2014
@@ -111,23 +111,35 @@ class InterleavedLedgerStorage implement
 
             @Override
             public void diskAlmostFull(File disk) {
-                gcThread.forceGC();
+                gcThread.enableForceGC();
             }
 
             @Override
             public void diskFull(File disk) {
-                gcThread.forceGC();
+                gcThread.enableForceGC();
             }
 
             @Override
             public void allDisksFull() {
-                gcThread.forceGC();
+                gcThread.enableForceGC();
             }
 
             @Override
             public void fatalError() {
                 // do nothing.
             }
+
+            @Override
+            public void diskWritable(File disk) {
+                // we have enough space now, disable force gc.
+                gcThread.disableForceGC();
+            }
+
+            @Override
+            public void diskJustWritable(File disk) {
+                // if a disk is just writable, we still need force gc.
+                gcThread.enableForceGC();
+            }
         };
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java Tue Jan 21 16:26:23 2014
@@ -85,6 +85,13 @@ public class LedgerDirsManager {
     }
 
     /**
+     * @return full-filled ledger dirs.
+     */
+    public List<File> getFullFilledLedgerDirs() {
+        return filledDirs;
+    }
+
+    /**
      * Get dirs, which are full more than threshold
      */
     public boolean isDirFull(File dir) {
@@ -115,6 +122,34 @@ public class LedgerDirsManager {
     }
 
     /**
+     * Add the dir to writable dirs list.
+     *
+     * @param dir Dir
+     */
+    public void addToWritableDirs(File dir, boolean underWarnThreshold) {
+        if (writableLedgerDirectories.contains(dir)) {
+            return;
+        }
+        LOG.info("{} becomes writable. Adding it to writable dirs list.", dir);
+        // Update writable dirs list
+        List<File> updatedWritableDirs = new ArrayList<File>(writableLedgerDirectories);
+        updatedWritableDirs.add(dir);
+        writableLedgerDirectories = updatedWritableDirs;
+        // Update the filled dirs list
+        List<File> newDirs = new ArrayList<File>(filledDirs);
+        newDirs.removeAll(writableLedgerDirectories);
+        filledDirs = newDirs;
+        // Notify listeners about disk writable
+        for (LedgerDirsListener listener : listeners) {
+            if (underWarnThreshold) {
+                listener.diskWritable(dir);
+            } else {
+                listener.diskJustWritable(dir);
+            }
+        }
+    }
+
+    /**
      * Returns one of the ledger dir from writable dirs list randomly.
      */
     File pickRandomWritableDir() throws NoWritableLedgerDirException {
@@ -198,48 +233,58 @@ public class LedgerDirsManager {
 
         @Override
         public void run() {
-            try {
-                while (true) {
-                    List<File> writableDirs;
+            while (true) {
+                List<File> writableDirs;
+                try {
+                    writableDirs = getWritableLedgerDirs();
+                } catch (NoWritableLedgerDirException e) {
+                    for (LedgerDirsListener listener : listeners) {
+                        listener.allDisksFull();
+                    }
+                    break;
+                }
+                // Check all writable dirs disk space usage.
+                for (File dir : writableDirs) {
                     try {
-                        writableDirs = getWritableLedgerDirs();
-                    } catch (NoWritableLedgerDirException e) {
+                        diskChecker.checkDir(dir);
+                    } catch (DiskErrorException e) {
+                        // Notify disk failure to all listeners
                         for (LedgerDirsListener listener : listeners) {
-                            listener.allDisksFull();
+                            LOG.warn("{} has errors.", dir, e);
+                            listener.diskFailed(dir);
                         }
-                        break;
-                    }
-                    // Check all writable dirs disk space usage.
-                    for (File dir : writableDirs) {
-                        try {
-                            diskChecker.checkDir(dir);
-                        } catch (DiskErrorException e) {
-                            // Notify disk failure to all listeners
-                            for (LedgerDirsListener listener : listeners) {
-                                LOG.warn("{} has errors.", dir, e);
-                                listener.diskFailed(dir);
-                            }
-                        } catch (DiskWarnThresholdException e) {
-                            for (LedgerDirsListener listener : listeners) {
-                                listener.diskAlmostFull(dir);
-                            }
-                        } catch (DiskOutOfSpaceException e) {
-                            // Notify disk full to all listeners
-                            addToFilledDirs(dir);
+                    } catch (DiskWarnThresholdException e) {
+                        for (LedgerDirsListener listener : listeners) {
+                            listener.diskAlmostFull(dir);
                         }
+                    } catch (DiskOutOfSpaceException e) {
+                        // Notify disk full to all listeners
+                        addToFilledDirs(dir);
                     }
+                }
+                List<File> fullfilledDirs = new ArrayList<File>(getFullFilledLedgerDirs());
+                // Check all full-filled disk space usage
+                for (File dir : fullfilledDirs) {
                     try {
-                        Thread.sleep(interval);
-                    } catch (InterruptedException e) {
-                        LOG.info("LedgerDirsMonitor thread is interrupted");
-                        break;
+                        diskChecker.checkDir(dir);
+                        addToWritableDirs(dir, true);
+                    } catch (DiskErrorException e) {
+                        //Notify disk failure to all the listeners
+                        for (LedgerDirsListener listener : listeners) {
+                            listener.diskFailed(dir);
+                        }
+                    } catch (DiskWarnThresholdException e) {
+                        // the full-filled dir become writable but still above warn threshold
+                        addToWritableDirs(dir, false);
+                    } catch (DiskOutOfSpaceException e) {
+                        // the full-filled dir is still full-filled
                     }
                 }
-            } catch (Exception e) {
-                LOG.error("Error Occured while checking disks", e);
-                // Notify disk failure to all listeners
-                for (LedgerDirsListener listener : listeners) {
-                    listener.fatalError();
+                try {
+                    Thread.sleep(interval);
+                } catch (InterruptedException e) {
+                    LOG.info("LedgerDirsMonitor thread is interrupted");
+                    break;
                 }
             }
             LOG.info("LedgerDirsMonitorThread exited!");
@@ -278,7 +323,7 @@ public class LedgerDirsManager {
     public static interface LedgerDirsListener {
         /**
          * This will be notified on disk failure/disk error
-         * 
+         *
          * @param disk
          *            Failed disk
          */
@@ -293,13 +338,29 @@ public class LedgerDirsManager {
 
         /**
          * This will be notified on disk detected as full
-         * 
+         *
          * @param disk
          *            Filled disk
          */
         void diskFull(File disk);
 
         /**
+         * This will be notified on disk detected as writable and under warn threshold
+         *
+         * @param disk
+         *          Writable disk
+         */
+        void diskWritable(File disk);
+
+        /**
+         * This will be notified on disk detected as writable but still in warn threshold
+         *
+         * @param disk
+         *          Writable disk
+         */
+        void diskJustWritable(File disk);
+
+        /**
          * This will be notified whenever all disks are detected as full.
          */
         void allDisksFull();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java Tue Jan 21 16:26:23 2014
@@ -32,8 +32,6 @@ import java.util.concurrent.TimeUnit;
 import org.jboss.netty.channel.ChannelException;
 import junit.framework.Assert;
 
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -122,6 +120,32 @@ public class BookieInitializationTest {
                 ExitCode.ZK_REG_FAIL, bkServer.getExitCode());
     }
 
+    @Test(timeout = 20000)
+    public void testBookieRegistrationWithSameZooKeeperClient() throws Exception {
+        File tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+        tmpDir.mkdir();
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(null).setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() });
+
+        final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
+                + InetAddress.getLocalHost().getHostAddress() + ":"
+                + conf.getBookiePort();
+
+        MockBookie b = new MockBookie(conf);
+        b.zk = zkc;
+        b.testRegisterBookie(conf);
+        Assert.assertNotNull("Bookie registration node doesn't exists!",
+                             zkc.exists(bkRegPath, false));
+
+        // test register bookie again if the registeration node is created by itself.
+        b.testRegisterBookie(conf);
+        Assert.assertNotNull("Bookie registration node doesn't exists!",
+                zkc.exists(bkRegPath, false));
+    }
+
     /**
      * Verify the bookie reg. Restarting bookie server will wait for the session
      * timeout when previous reg node exists in zk. On zNode delete event,

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Tue Jan 21 16:26:23 2014
@@ -205,7 +205,7 @@ public class CompactionTest extends Book
         storage.start();
         long startTime = MathUtils.now();
         Thread.sleep(2000);
-        storage.gcThread.forceGC();
+        storage.gcThread.enableForceGC();
         Thread.sleep(1000);
         // Minor and Major compaction times should be larger than when we started
         // this test.

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java Tue Jan 21 16:26:23 2014
@@ -340,5 +340,13 @@ public class TestSyncThread {
         @Override
         public void fatalError() {
         }
+
+        @Override
+        public void diskWritable(File disk) {
+        }
+
+        @Override
+        public void diskJustWritable(File disk) {
+        }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Tue Jan 21 16:26:23 2014
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.client.Book
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
 
 /**
  * Test to verify the readonly feature of bookies
@@ -43,6 +44,7 @@ public class ReadOnlyBookieTest extends 
     /**
      * Check readonly bookie
      */
+    @Test(timeout = 60000)
     public void testBookieShouldServeAsReadOnly() throws Exception {
         killBookie(0);
         baseConf.setReadOnlyModeEnabled(true);
@@ -66,6 +68,7 @@ public class ReadOnlyBookieTest extends 
 
         try {
             ledger.addEntry("data".getBytes());
+            fail("Should fail to add entry since there isn't enough bookies alive.");
         } catch (BKException.BKNotEnoughBookiesException e) {
             // Expected
         }
@@ -84,9 +87,73 @@ public class ReadOnlyBookieTest extends 
         }
     }
 
+    @Test(timeout = 60000)
+    public void testBookieShouldTurnWritableFromReadOnly() throws Exception {
+        killBookie(0);
+        baseConf.setReadOnlyModeEnabled(true);
+        startNewBookie();
+        LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+                "".getBytes());
+
+        // Check new bookie with readonly mode enabled.
+        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        assertEquals("Only one ledger dir should be present", 1,
+                ledgerDirs.length);
+        Bookie bookie = bs.get(1).getBookie();
+        LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry("data".getBytes());
+        }
+
+        File testDir = new File(ledgerDirs[0], "current");
+
+        // Now add the current ledger dir to filled dirs list
+        ledgerDirsManager.addToFilledDirs(testDir);
+
+        try {
+            ledger.addEntry("data".getBytes());
+            fail("Should fail to add entry since there isn't enough bookies alive.");
+        } catch (BKException.BKNotEnoughBookiesException e) {
+            // Expected
+        }
+        LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), bookie.isReadOnly());
+        assertTrue("Bookie should be running and converted to readonly mode",
+                bookie.isRunning() && bookie.isReadOnly());
+
+        // refresh the bookkeeper client
+        bkc.readBookiesBlocking();
+        // should fail to create ledger
+        try {
+            bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+            fail("Should fail to create a ledger since there isn't enough bookies alive.");
+        } catch (BKException.BKNotEnoughBookiesException bke) {
+            // Expected.
+        }
+
+        // Now add the current ledger dir back to writable dirs list
+        ledgerDirsManager.addToWritableDirs(testDir, true);
+
+        LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), bookie.isReadOnly());
+        assertTrue("Bookie should be running and converted back to writable mode", bookie.isRunning()
+                && !bookie.isReadOnly());
+        // force client to read bookies
+        bkc.readBookiesBlocking();
+        LedgerHandle newLedger = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+        for (int i = 0; i < 10; i++) {
+            newLedger.addEntry("data".getBytes());
+        }
+        Enumeration<LedgerEntry> readEntries = newLedger.readEntries(0, 9);
+        while (readEntries.hasMoreElements()) {
+            LedgerEntry entry = readEntries.nextElement();
+            assertEquals("Entry should contain correct data", "data", new String(entry.getEntry()));
+        }
+    }
+
     /**
      * check readOnlyModeEnabled=false
      */
+    @Test(timeout = 60000)
     public void testBookieShutdownIfReadOnlyModeNotEnabled() throws Exception {
         File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 1,
@@ -105,6 +172,7 @@ public class ReadOnlyBookieTest extends 
 
         try {
             ledger.addEntry("data".getBytes());
+            fail("Should fail to add entry since there isn't enough bookies alive.");
         } catch (BKException.BKNotEnoughBookiesException e) {
             // Expected
         }
@@ -120,6 +188,7 @@ public class ReadOnlyBookieTest extends 
     /**
      * Check multiple ledger dirs
      */
+    @Test(timeout = 60000)
     public void testBookieContinueWritingIfMultipleLedgersPresent()
             throws Exception {
         startNewBookieWithMultipleLedgerDirs(2);
@@ -171,6 +240,7 @@ public class ReadOnlyBookieTest extends 
     /**
      * Test ledger creation with readonly bookies
      */
+    @Test(timeout = 60000)
     public void testLedgerCreationShouldFailWithReadonlyBookie() throws Exception {
         killBookie(1);
         baseConf.setReadOnlyModeEnabled(true);