You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/18 16:40:31 UTC

svn commit: r1399680 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookk...

Author: ivank
Date: Thu Oct 18 14:40:30 2012
New Revision: 1399680

URL: http://svn.apache.org/viewvc?rev=1399680&view=rev
Log:
BOOKKEEPER-345: Detect IOExceptions on entrylogger and bookie should consider next ledger dir(if any) (Vinay via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 18 14:40:30 2012
@@ -158,6 +158,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-315: Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank)
 
+        BOOKKEEPER-345: Detect IOExceptions on entrylogger and bookie should consider next ledger dir(if any) (Vinay via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Thu Oct 18 14:40:30 2012
@@ -159,3 +159,21 @@ zkTimeout=10000
 # the limitation of number of index pages.
 # pageLimit=-1
 
+#If all ledger directories configured are full, then support only read 
+#requests for clients.
+#If "readOnlyModeEnabled=true" then on all ledger disks full, bookie will be converted
+#to read-only mode and serve only read requests. Else bookie will get shutdown.
+#By default this will be disabled.
+#readOnlyModeEnabled=false
+
+#For each ledger dir, maximum disk space which can be used.
+#Default is 0.95f. i.e. 95% of disk can be used at most after which nothing will
+#be written to that partition. If all ledger dir partions are full, then bookie
+#will turn to readonly mode if 'readOnlyModeEnabled=true' is set, else it will
+#shutdown.
+#Valid values should be in between 0 and 1 (exclusive). 
+#diskUsageThreshold=0.95
+
+#Disk check interval in milli seconds, interval to check the ledger dirs usage.
+#Default is 10000
+#diskCheckInterval=10000

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu Oct 18 14:40:30 2012
@@ -42,6 +42,8 @@ import org.apache.bookkeeper.meta.Active
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.jmx.BKMBeanRegistry;
@@ -53,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -66,11 +69,11 @@ import org.apache.zookeeper.Watcher.Even
 
 public class Bookie extends Thread {
     public static final String INSTANCEID = "INSTANCEID";
+    public static final String READONLY = "readonly";
 
     static Logger LOG = LoggerFactory.getLogger(Bookie.class);
 
     final File journalDirectory;
-    final File ledgerDirectories[];
     final ServerConfiguration conf;
 
     final SyncThread syncThread;
@@ -87,6 +90,8 @@ public class Bookie extends Thread {
     private final String bookieRegistrationPath;
     static final String CURRENT_DIR = "current";
 
+    private LedgerDirsManager ledgerDirsManager;
+
     // ZooKeeper client instance for the Bookie
     ZooKeeper zk;
     private volatile boolean isZkExpired = true;
@@ -104,6 +109,10 @@ public class Bookie extends Thread {
 
     Map<Long, byte[]> masterKeyCache = Collections.synchronizedMap(new HashMap<Long, byte[]>());
 
+    final private String zkBookieRegPath;
+
+    final private AtomicBoolean readOnly = new AtomicBoolean(false);
+
     public static class NoLedgerException extends IOException {
         private static final long serialVersionUID = 1L;
         private long ledgerId;
@@ -208,6 +217,10 @@ public class Bookie extends Thread {
                 boolean flushFailed = false;
                 try {
                     ledgerStorage.flush();
+                } catch (NoWritableLedgerDirException e) {
+                    flushFailed = true;
+                    flushing.set(false);
+                    transitionToReadOnlyMode();
                 } catch (IOException e) {
                     LOG.error("Exception flushing Ledger", e);
                     flushFailed = true;
@@ -216,8 +229,13 @@ public class Bookie extends Thread {
                 // if flush failed, we should not roll last mark, otherwise we would
                 // have some ledgers are not flushed and their journal entries were lost
                 if (!flushFailed) {
-                    journal.rollLog();
-                    journal.gcJournals();
+                    try {
+                        journal.rollLog();
+                        journal.gcJournals();
+                    } catch (NoWritableLedgerDirException e) {
+                        flushing.set(false);
+                        transitionToReadOnlyMode();
+                    }
                 }
 
                 // clear flushing flag
@@ -272,7 +290,7 @@ public class Bookie extends Thread {
     private void checkEnvironment(ZooKeeper zk) throws BookieException, IOException {
         if (zk == null) { // exists only for testing, just make sure directories are correct
             checkDirectoryStructure(journalDirectory);
-            for (File dir : ledgerDirectories) {
+            for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
                     checkDirectoryStructure(dir);
             }
             return;
@@ -300,7 +318,7 @@ public class Bookie extends Thread {
             } catch (FileNotFoundException fnf) {
                 missedCookieDirs.add(journalDirectory);
             }
-            for (File dir : ledgerDirectories) {
+            for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
                 checkDirectoryStructure(dir);
                 try {
                     Cookie c = Cookie.readFromDirectory(dir);
@@ -319,7 +337,7 @@ public class Bookie extends Thread {
                 if (missedCookieDirs.size() > 0) {
                     LOG.debug("Directories missing cookie file are {}", missedCookieDirs);
                     masterCookie.writeToDirectory(journalDirectory);
-                    for (File dir : ledgerDirectories) {
+                    for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
                         masterCookie.writeToDirectory(dir);
                     }
                 }
@@ -353,6 +371,10 @@ public class Bookie extends Thread {
         return instanceId;
     }
 
+    public LedgerDirsManager getLedgerDirsManager() {
+        return ledgerDirsManager;
+    }
+
     public static File getCurrentDirectory(File dir) {
         return new File(dir, CURRENT_DIR);
     }
@@ -372,7 +394,7 @@ public class Bookie extends Thread {
         this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
         this.conf = conf;
         this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
-        this.ledgerDirectories = getCurrentDirectories(conf.getLedgerDirs());
+        this.ledgerDirsManager = new LedgerDirsManager(conf);
 
         // instantiate zookeeper client to initialize ledger manager
         this.zk = instantiateZookeeperClient(conf);
@@ -382,10 +404,19 @@ public class Bookie extends Thread {
         activeLedgerManager = activeLedgerManagerFactory.newActiveLedgerManager();
 
         syncThread = new SyncThread(conf);
-        ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager);
+        ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager,
+                ledgerDirsManager);
         handles = new HandleFactoryImpl(ledgerStorage);
         // instantiate the journal
-        journal = new Journal(conf);
+        journal = new Journal(conf, ledgerDirsManager);
+
+        // ZK ephemeral node for this Bookie.
+        zkBookieRegPath = this.bookieRegistrationPath + getMyId();
+    }
+
+    private String getMyId() throws UnknownHostException {
+        return InetAddress.getLocalHost().getHostAddress() + ":"
+                + conf.getBookiePort();
     }
 
     void readJournal() throws IOException, BookieException {
@@ -443,6 +474,10 @@ public class Bookie extends Thread {
         // start bookie thread
         super.start();
 
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        //Start DiskChecker thread
+        ledgerDirsManager.start();
+
         ledgerStorage.start();
 
         syncThread.start();
@@ -458,6 +493,38 @@ public class Bookie extends Thread {
         }
     }
 
+    /*
+     * Get the DiskFailure listener for the bookie
+     */
+    private LedgerDirsListener getLedgerDirsListener() {
+
+        return new LedgerDirsListener() {
+
+            @Override
+            public void diskFull(File disk) {
+                // Nothing needs to be handled here.
+            }
+
+            @Override
+            public void diskFailed(File disk) {
+                // Shutdown the bookie on disk failure.
+                triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+            }
+
+            @Override
+            public void allDisksFull() {
+                // Transition to readOnly mode on all disks full
+                transitionToReadOnlyMode();
+            }
+
+            @Override
+            public void fatalError() {
+                LOG.error("Fatal error reported by ledgerDirsManager");
+                triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+            }
+        };
+    }
+
     /**
      * Register jmx with parent
      *
@@ -525,9 +592,6 @@ public class Bookie extends Thread {
             // zookeeper instance is null, means not register itself to zk
             return;
         }
-        // ZK ephemeral node for this Bookie.
-        String zkBookieRegPath = this.bookieRegistrationPath
-                + InetAddress.getLocalHost().getHostAddress() + ":" + port;
         final CountDownLatch prevNodeLatch = new CountDownLatch(1);
         try{
             Watcher zkPrevRegNodewatcher = new Watcher() {
@@ -573,6 +637,63 @@ public class Bookie extends Thread {
         }
     }
 
+    /*
+     * Transition the bookie to readOnly mode
+     */
+    void transitionToReadOnlyMode() {
+        if (!readOnly.compareAndSet(false, true)) {
+            return;
+        }
+        if (!conf.isReadOnlyModeEnabled()) {
+            LOG.warn("ReadOnly mode is not enabled. "
+                    + "Can be enabled by configuring "
+                    + "'readOnlyModeEnabled=true' in configuration."
+                    + "Shutting down bookie");
+            triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+            return;
+        }
+        LOG.info("Transitioning Bookie to ReadOnly mode,"
+                + " and will serve only read requests from clients!");
+        try {
+            if (null == zk
+                    .exists(this.bookieRegistrationPath + READONLY, false)) {
+                try {
+                    zk.create(this.bookieRegistrationPath + READONLY,
+                            new byte[0], Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                } catch (NodeExistsException e) {
+                    // this node is just now created by someone.
+                }
+            }
+            // Clear the current registered node
+            zk.delete(zkBookieRegPath, -1);
+            // Create the readonly node
+            zk.create(this.bookieRegistrationPath + READONLY + "/" + getMyId(),
+                    new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        } catch (IOException e) {
+            LOG.error("Error in transition to ReadOnly Mode."
+                    + " Shutting down", e);
+            triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+            return;
+        } catch (KeeperException e) {
+            LOG.error("Error in transition to ReadOnly Mode."
+                    + " Shutting down", e);
+            triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+            return;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.warn("Interrupted Exception while transitioning to ReadOnly Mode.");
+            return;
+        }
+    }
+
+    /*
+     * Check whether Bookie is writable
+     */
+    public boolean isReadOnly() {
+        return readOnly.get();
+    }
+
     /**
      * Create a new zookeeper client to zk cluster.
      *
@@ -643,7 +764,25 @@ public class Bookie extends Thread {
             // following add operations to it would hang unit client timeout
             // so we should let bookie server exists
             LOG.error("Journal manager quits unexpectedly.");
-            shutdown(ExitCode.BOOKIE_EXCEPTION);
+            triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+        }
+    }
+
+    // Triggering the Bookie shutdown in its own thread,
+    // because shutdown can be called from sync thread which would be
+    // interrupted by shutdown call.
+    void triggerBookieShutdown(final int exitCode) {
+        Thread shutdownThread = new Thread() {
+            public void run() {
+                Bookie.this.shutdown(exitCode);
+            }
+        };
+        shutdownThread.start();
+        try {
+            shutdownThread.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.debug("InterruptedException while waiting for shutdown. Not a problem!!");
         }
     }
 
@@ -665,6 +804,10 @@ public class Bookie extends Thread {
 
                 // Shutdown the ZK client
                 if(zk != null) zk.close();
+
+                //Shutdown disk checker
+                ledgerDirsManager.shutdown();
+
                 // Shutdown journal
                 journal.shutdown();
                 this.join();
@@ -744,9 +887,14 @@ public class Bookie extends Thread {
      */
     public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) 
             throws IOException, BookieException {
-        LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
-        synchronized (handle) {
-            addEntryInternal(handle, entry, cb, ctx);
+        try {
+            LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
+            synchronized (handle) {
+                addEntryInternal(handle, entry, cb, ctx);
+            }
+        } catch (NoWritableLedgerDirException e) {
+            transitionToReadOnlyMode();
+            throw new IOException(e);
         }
     }
     
@@ -756,13 +904,18 @@ public class Bookie extends Thread {
      */
     public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
-        LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
-        synchronized (handle) {
-            if (handle.isFenced()) {
-                throw BookieException.create(BookieException.Code.LedgerFencedException);
-            }
-
-            addEntryInternal(handle, entry, cb, ctx);
+        try {
+            LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
+            synchronized (handle) {
+                if (handle.isFenced()) {
+                    throw BookieException
+                            .create(BookieException.Code.LedgerFencedException);
+                }
+                addEntryInternal(handle, entry, cb, ctx);
+            }
+        } catch (NoWritableLedgerDirException e) {
+            transitionToReadOnlyMode();
+            throw new IOException(e);
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Thu Oct 18 14:40:30 2012
@@ -640,7 +640,7 @@ public class BookieShell implements Tool
 
     private synchronized Journal getJournal() throws IOException {
         if (null == journal) {
-            journal = new Journal(bkConf);
+            journal = new Journal(bkConf, new LedgerDirsManager(bkConf));
         }
         return journal;
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Thu Oct 18 14:40:30 2012
@@ -34,16 +34,17 @@ import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
 
@@ -56,7 +57,10 @@ import org.apache.bookkeeper.util.IOUtil
  */
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
-    private File dirs[];
+
+    volatile File currentDir;
+    private LedgerDirsManager ledgerDirsManager;
+    private AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
     private long logId;
     /**
@@ -105,8 +109,9 @@ public class EntryLogger {
      * Create an EntryLogger that stores it's log files in the given
      * directories
      */
-    public EntryLogger(ServerConfiguration conf) throws IOException {
-        this.dirs = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+    public EntryLogger(ServerConfiguration conf,
+            LedgerDirsManager ledgerDirsManager) throws IOException {
+        this.ledgerDirsManager = ledgerDirsManager;
         // log size limit
         this.logSizeLimit = conf.getEntryLogSizeLimit();
 
@@ -118,7 +123,7 @@ public class EntryLogger {
         LOGFILE_HEADER.put("BKLO".getBytes());
         // Find the largest logId
         logId = -1;
-        for(File dir: dirs) {
+        for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
             if (!dir.exists()) {
                 throw new FileNotFoundException(
                         "Entry log directory does not exist");
@@ -142,16 +147,44 @@ public class EntryLogger {
     }
 
     protected void initialize() throws IOException {
+        // Register listener for disk full notifications.
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
         // create a new log to write
         createNewLog();
     }
 
+    private LedgerDirsListener getLedgerDirsListener() {
+        return new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                // If the current entry log disk is full, then create new entry
+                // log.
+                if (currentDir != null && currentDir.equals(disk)) {
+                    shouldCreateNewEntryLog.set(true);
+                }
+            }
+
+            @Override
+            public void diskFailed(File disk) {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void allDisksFull() {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void fatalError() {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+        };
+    }
+
     /**
      * Creates a new log file
      */
     void createNewLog() throws IOException {
-        List<File> list = Arrays.asList(dirs);
-        Collections.shuffle(list);
         if (logChannel != null) {
             logChannel.flush(true);
         }
@@ -160,21 +193,23 @@ public class EntryLogger {
         File newLogFile = null;
         do {
             String logFileName = Long.toHexString(++logId) + ".log";
-            for (File dir : list) {
-                newLogFile = new File(dir, logFileName);
-                if (newLogFile.exists()) {
-                    LOG.warn("Found existed entry log " + newLogFile
-                           + " when trying to create it as a new log.");
-                    newLogFile = null;
-                    break;
-                }
+            File dir = ledgerDirsManager.pickRandomWritableDir();
+            newLogFile = new File(dir, logFileName);
+            currentDir = dir;
+            if (newLogFile.exists()) {
+                LOG.warn("Found existed entry log " + newLogFile
+                        + " when trying to create it as a new log.");
+                newLogFile = null;
+                continue;
             }
         } while (newLogFile == null);
 
         logChannel = new BufferedChannel(new RandomAccessFile(newLogFile, "rw").getChannel(), 64*1024);
         logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
         channels.put(logId, logChannel);
-        for(File f: dirs) {
+
+        List<File> listOfDirs = ledgerDirsManager.getWritableLedgerDirs();
+        for (File f : listOfDirs) {
             setLastLogId(f, logId);
         }
     }
@@ -290,8 +325,16 @@ public class EntryLogger {
         }
     }
     synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
-        if (logChannel.position() + entry.remaining() + 4 > logSizeLimit) {
+        // Create new log if logSizeLimit reached or current disk is full
+        boolean createNewLog = shouldCreateNewEntryLog.get();
+        if (createNewLog
+                || (logChannel.position() + entry.remaining() + 4 > logSizeLimit)) {
             createNewLog();
+
+            // Reset the flag
+            if (createNewLog) {
+                shouldCreateNewEntryLog.set(false);
+            }
         }
         ByteBuffer buff = ByteBuffer.allocate(4);
         buff.putInt(entry.remaining());
@@ -374,7 +417,7 @@ public class EntryLogger {
      * Whether the log file exists or not.
      */
     boolean logExists(long logId) {
-        for (File d : dirs) {
+        for (File d : ledgerDirsManager.getAllLedgerDirs()) {
             File f = new File(d, Long.toHexString(logId) + ".log");
             if (f.exists()) {
                 return true;
@@ -384,7 +427,7 @@ public class EntryLogger {
     }
 
     private File findFile(long logId) throws FileNotFoundException {
-        for(File d: dirs) {
+        for (File d : ledgerDirsManager.getAllLedgerDirs()) {
             File f = new File(d, Long.toHexString(logId)+".log");
             if (f.exists()) {
                 return f;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Thu Oct 18 14:40:30 2012
@@ -40,8 +40,8 @@ import org.slf4j.LoggerFactory;
 class InterleavedLedgerStorage implements LedgerStorage {
     final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
 
-    private EntryLogger entryLogger;
-    private LedgerCache ledgerCache;
+    EntryLogger entryLogger;
+    LedgerCache ledgerCache;
     // This is the thread that garbage collects the entry logs that do not
     // contain any active ledgers in them; and compacts the entry logs that
     // has lower remaining percentage to reclaim disk space.
@@ -50,9 +50,10 @@ class InterleavedLedgerStorage implement
     // this indicates that a write has happened since the last flush
     private volatile boolean somethingWritten = false;
 
-    InterleavedLedgerStorage(ServerConfiguration conf, ActiveLedgerManager activeLedgerManager)
-            throws IOException {
-        entryLogger = new EntryLogger(conf);
+    InterleavedLedgerStorage(ServerConfiguration conf,
+            ActiveLedgerManager activeLedgerManager,
+            LedgerDirsManager ledgerDirsManager) throws IOException {
+        entryLogger = new EntryLogger(conf, ledgerDirsManager);
         ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
         gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
                 activeLedgerManager, new EntryLogCompactionScanner());

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Thu Oct 18 14:40:30 2012
@@ -32,6 +32,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.IOUtils;
@@ -111,7 +112,7 @@ class Journal extends Thread {
             return txnLogPosition;
         }
 
-        synchronized void rollLog() {
+        synchronized void rollLog() throws NoWritableLedgerDirException {
             byte buff[] = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
             // we should record <logId, logPosition> marked in markLog
@@ -120,7 +121,9 @@ class Journal extends Thread {
             bb.putLong(lastMark.getTxnLogId());
             bb.putLong(lastMark.getTxnLogPosition());
             LOG.debug("RollLog to persist last marked log : {}", lastMark);
-            for (File dir : ledgerDirectories) {
+            List<File> writableLedgerDirs = ledgerDirsManager
+                    .getWritableLedgerDirs();
+            for (File dir : writableLedgerDirs) {
                 File file = new File(dir, "lastMark");
                 FileOutputStream fos = null;
                 try {
@@ -148,7 +151,7 @@ class Journal extends Thread {
         synchronized void readLog() {
             byte buff[] = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
-            for(File dir: ledgerDirectories) {
+            for(File dir: ledgerDirsManager.getAllLedgerDirs()) {
                 File file = new File(dir, "lastMark");
                 try {
                     FileInputStream fis = new FileInputStream(file);
@@ -250,7 +253,6 @@ class Journal extends Thread {
     final int maxBackupJournals;
 
     final File journalDirectory;
-    final File ledgerDirectories[];
     final ServerConfiguration conf;
 
     private LastLogMark lastLogMark = new LastLogMark(0, 0);
@@ -259,12 +261,13 @@ class Journal extends Thread {
     LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
 
     volatile boolean running = true;
+    private LedgerDirsManager ledgerDirsManager;
 
-    public Journal(ServerConfiguration conf) {
+    public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
         super("BookieJournal-" + conf.getBookiePort());
+        this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
         this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
-        this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
         this.maxJournalSize = conf.getMaxJournalSize() * MB;
         this.maxBackupJournals = conf.getMaxBackupJournals();
 
@@ -314,7 +317,7 @@ class Journal extends Thread {
      * </p>
      * @see #markLog()
      */
-    public void rollLog() {
+    public void rollLog() throws NoWritableLedgerDirException {
         lastLogMark.rollLog();
     }
 

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,240 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
+import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages ledger directories used by the bookie.
+ */
+public class LedgerDirsManager {
+    private static Logger LOG = LoggerFactory
+            .getLogger(LedgerDirsManager.class);
+
+    private volatile List<File> filledDirs;
+    private final List<File> ledgerDirectories;
+    private volatile List<File> writableLedgerDirectories;
+    private DiskChecker diskChecker;
+    private List<LedgerDirsListener> listeners;
+    private LedgerDirsMonitor monitor;
+    private final Random rand = new Random();
+
+    public LedgerDirsManager(ServerConfiguration conf) {
+        this.ledgerDirectories = Arrays.asList(Bookie
+                .getCurrentDirectories(conf.getLedgerDirs()));
+        this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
+        this.filledDirs = new ArrayList<File>();
+        listeners = new ArrayList<LedgerDirsManager.LedgerDirsListener>();
+        diskChecker = new DiskChecker(conf.getDiskUsageThreshold());
+        monitor = new LedgerDirsMonitor(conf.getDiskCheckInterval());
+    }
+
+    /**
+     * Get all ledger dirs configured
+     */
+    public List<File> getAllLedgerDirs() {
+        return ledgerDirectories;
+    }
+
+    /**
+     * Get only writable ledger dirs.
+     */
+    public List<File> getWritableLedgerDirs()
+            throws NoWritableLedgerDirException {
+        if (writableLedgerDirectories.isEmpty()) {
+            String errMsg = "All ledger directories are non writable";
+            NoWritableLedgerDirException e = new NoWritableLedgerDirException(
+                    errMsg);
+            LOG.error(errMsg, e);
+            throw e;
+        }
+        return writableLedgerDirectories;
+    }
+
+    /**
+     * Get dirs, which are full more than threshold
+     */
+    public boolean isDirFull(File dir) {
+        return filledDirs.contains(dir);
+    }
+
+    /**
+     * Add the dir to filled dirs list
+     */
+    // VisibleForTesting
+    public void addToFilledDirs(File dir) {
+        if (!filledDirs.contains(dir)) {
+            LOG.warn(dir + " is out of space."
+                    + " Adding it to filled dirs list");
+            // Update filled dirs list
+            List<File> updatedFilledDirs = new ArrayList<File>(filledDirs);
+            updatedFilledDirs.add(dir);
+            filledDirs = updatedFilledDirs;
+            // Update the writable ledgers list
+            List<File> newDirs = new ArrayList<File>(writableLedgerDirectories);
+            newDirs.removeAll(filledDirs);
+            writableLedgerDirectories = newDirs;
+            // Notify listeners about disk full
+            for (LedgerDirsListener listener : listeners) {
+                listener.diskFull(dir);
+            }
+        }
+    }
+
+    /**
+     * Returns one of the ledger dir from writable dirs list randomly.
+     */
+    File pickRandomWritableDir() throws NoWritableLedgerDirException {
+        List<File> writableDirs = getWritableLedgerDirs();
+        return writableDirs.get(rand.nextInt(writableDirs.size()));
+    }
+
+    public void addLedgerDirsListener(LedgerDirsListener listener) {
+        if (listener != null) {
+            listeners.add(listener);
+        }
+    }
+
+    // start the daemon for disk monitoring
+    public void start() {
+        monitor.setDaemon(true);
+        monitor.start();
+    }
+
+    // shutdown disk monitoring daemon
+    public void shutdown() {
+        monitor.interrupt();
+        try {
+            monitor.join();
+        } catch (InterruptedException e) {
+            // Ignore
+        }
+    }
+
+    /**
+     * Thread to monitor the disk space periodically.
+     */
+    private class LedgerDirsMonitor extends Thread {
+        int interval;
+
+        public LedgerDirsMonitor(int interval) {
+            this.interval = interval;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    List<File> writableDirs;
+                    try {
+                        writableDirs = getWritableLedgerDirs();
+                    } catch (NoWritableLedgerDirException e) {
+                        for (LedgerDirsListener listener : listeners) {
+                            listener.allDisksFull();
+                        }
+                        break;
+                    }
+                    // Check all writable dirs disk space usage.
+                    for (File dir : writableDirs) {
+                        try {
+                            diskChecker.checkDir(dir);
+                        } catch (DiskErrorException e) {
+                            // Notify disk failure to all listeners
+                            for (LedgerDirsListener listener : listeners) {
+                                listener.diskFailed(dir);
+                            }
+                        } catch (DiskOutOfSpaceException e) {
+                            // Notify disk full to all listeners
+                            addToFilledDirs(dir);
+                        }
+                    }
+                    try {
+                        Thread.sleep(interval);
+                    } catch (InterruptedException e) {
+                        LOG.info("LedgerDirsMonitor thread is interrupted");
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("Error Occured while checking disks", e);
+                // Notify disk failure to all listeners
+                for (LedgerDirsListener listener : listeners) {
+                    listener.fatalError();
+                }
+            }
+        }
+    }
+
+    /**
+     * Indicates All configured ledger directories are full.
+     */
+    public static class NoWritableLedgerDirException extends IOException {
+        private static final long serialVersionUID = -8696901285061448421L;
+
+        public NoWritableLedgerDirException(String errMsg) {
+            super(errMsg);
+        }
+    }
+
+    /**
+     * Listener for the disk check events will be notified from the
+     * {@link LedgerDirsManager} whenever disk full/failure detected.
+     */
+    public static interface LedgerDirsListener {
+        /**
+         * This will be notified on disk failure/disk error
+         * 
+         * @param disk
+         *            Failed disk
+         */
+        void diskFailed(File disk);
+
+        /**
+         * This will be notified on disk detected as full
+         * 
+         * @param disk
+         *            Filled disk
+         */
+        void diskFull(File disk);
+
+        /**
+         * This will be notified whenever all disks are detected as full.
+         */
+        void allDisksFull();
+
+        /**
+         * This will notify the fatal errors.
+         */
+        void fatalError();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java Thu Oct 18 14:40:30 2012
@@ -32,7 +32,7 @@ import org.apache.bookkeeper.conf.Server
 public class ReadOnlyEntryLogger extends EntryLogger {
 
     public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
-        super(conf);
+        super(conf, new LedgerDirsManager(conf));
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Thu Oct 18 14:40:30 2012
@@ -58,6 +58,10 @@ public class ServerConfiguration extends
     // Statistics Parameters
     protected final static String ENABLE_STATISTICS = "enableStatistics";
     protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
+    //ReadOnly mode support on all disk full
+    protected final static String READ_ONLY_MODE_ENABLED = "readOnlyModeEnabled";
+    protected final static String DISK_USAGE_THRESHOLD = "diskUsageThreshold";
+    protected final static String DISK_CHECK_INTERVAL = "diskCheckInterval";
 
     /**
      * Construct a default configuration object
@@ -573,4 +577,50 @@ public class ServerConfiguration extends
     public long getOpenLedgerRereplicationGracePeriod() {
         return getLong(OPEN_LEDGER_REREPLICATION_GRACE_PERIOD, 30000);
     }
+
+    /**
+     * Set the ReadOnlyModeEnabled status
+     */
+    public ServerConfiguration setReadOnlyModeEnabled(boolean enabled) {
+        setProperty(READ_ONLY_MODE_ENABLED, enabled);
+        return this;
+    }
+
+    /**
+     * Get ReadOnlyModeEnabled status
+     */
+    public boolean isReadOnlyModeEnabled() {
+        return getBoolean(READ_ONLY_MODE_ENABLED, false);
+    }
+
+    /**
+     * Set the Disk free space threshold in Bytes after which disk will be
+     * considered as full during diskcheck.
+     */
+    public ServerConfiguration setDiskUsageThreshold(float threshold) {
+        setProperty(DISK_USAGE_THRESHOLD, threshold);
+        return this;
+    }
+
+    /**
+     * Returns disk free space threshold. By default 100MB
+     */
+    public float getDiskUsageThreshold() {
+        return getFloat(DISK_USAGE_THRESHOLD, 0.95f);
+    }
+
+    /**
+     * Set the disk checker interval to monitor ledger disk space
+     */
+    public ServerConfiguration setDiskCheckInterval(int interval) {
+        setProperty(DISK_CHECK_INTERVAL, interval);
+        return this;
+    }
+
+    /**
+     * Get the disk checker interval
+     */
+    public int getDiskCheckInterval() {
+        return getInt(DISK_CHECK_INTERVAL, 10 * 1000);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java Thu Oct 18 14:40:30 2012
@@ -165,6 +165,10 @@ public interface BookieProtocol {
      */
     public static final int EFENCED = 104;
 
+    /**
+     * The server is running as read-only mode
+     */
+    public static final int EREADONLY = 105;
 
     public static final short FLAG_NONE = 0x0;
     public static final short FLAG_DO_FENCING = 0x0001;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Thu Oct 18 14:40:30 2012
@@ -104,6 +104,11 @@ public class BookieServer implements NIO
         }
     }
 
+    //VisibleForTesting
+    public Bookie getBookie() {
+        return bookie;
+    }
+
     public synchronized void shutdown() {
         if (!running) {
             return;
@@ -361,6 +366,15 @@ public class BookieServer implements NIO
         switch (h.getOpCode()) {
         case BookieProtocol.ADDENTRY:
             statType = BKStats.STATS_ADD;
+
+            if (bookie.isReadOnly()) {
+                LOG.warn("BookieServer is running as readonly mode,"
+                        + " so rejecting the request from the client!");
+                src.sendResponse(buildResponse(BookieProtocol.EREADONLY,
+                        h.getVersion(), h.getOpCode(), ledgerId, entryId));
+                break;
+            }
+
             try {
                 TimedCnxn tsrc = new TimedCnxn(src, startTime);
                 if ((flags & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Class that provides utility functions for checking disk problems
+ */
+public class DiskChecker {
+    private float diskUsageThreshold;
+
+    public static class DiskErrorException extends IOException {
+        private static final long serialVersionUID = 9091606022449761729L;
+
+        public DiskErrorException(String msg) {
+            super(msg);
+        }
+    }
+
+    public static class DiskOutOfSpaceException extends IOException {
+        private static final long serialVersionUID = 160898797915906860L;
+
+        public DiskOutOfSpaceException(String msg) {
+            super(msg);
+        }
+    }
+
+    public DiskChecker(float threshold) {
+        validateThreshold(threshold);
+        this.diskUsageThreshold = threshold;
+    }
+
+    /**
+     * The semantics of mkdirsWithExistsCheck method is different from the
+     * mkdirs method provided in the Sun's java.io.File class in the following
+     * way: While creating the non-existent parent directories, this method
+     * checks for the existence of those directories if the mkdir fails at any
+     * point (since that directory might have just been created by some other
+     * process). If both mkdir() and the exists() check fails for any seemingly
+     * non-existent directory, then we signal an error; Sun's mkdir would signal
+     * an error (return false) if a directory it is attempting to create already
+     * exists or the mkdir fails.
+     * 
+     * @param dir
+     * @return true on success, false on failure
+     */
+    private static boolean mkdirsWithExistsCheck(File dir) {
+        if (dir.mkdir() || dir.exists()) {
+            return true;
+        }
+        File canonDir = null;
+        try {
+            canonDir = dir.getCanonicalFile();
+        } catch (IOException e) {
+            return false;
+        }
+        String parent = canonDir.getParent();
+        return (parent != null)
+                && (mkdirsWithExistsCheck(new File(parent)) && (canonDir
+                        .mkdir() || canonDir.exists()));
+    }
+
+    /**
+     * Checks the disk space available.
+     * 
+     * @param dir
+     *            Directory to check for the disk space
+     * @throws DiskOutOfSpaceException
+     *             Throws {@link DiskOutOfSpaceException} if available space is
+     *             less than threshhold.
+     */
+    // VisibleForTesting
+    void checkDiskFull(File dir) throws DiskOutOfSpaceException {
+        if (null == dir) {
+            return;
+        }
+        if (dir.exists()) {
+            long usableSpace = dir.getUsableSpace();
+            long totalSpace = dir.getTotalSpace();
+            float free = (float) usableSpace / (float) totalSpace;
+            float used = 1f - free;
+            if (used > diskUsageThreshold) {
+                throw new DiskOutOfSpaceException("Space left on device "
+                        + usableSpace + " < threshhold " + diskUsageThreshold);
+            }
+        } else {
+            checkDiskFull(dir.getParentFile());
+        }
+    }
+
+    /**
+     * Create the directory if it doesn't exist and
+     * 
+     * @param dir
+     *            Directory to check for the disk error/full.
+     * @throws DiskErrorException
+     *             If disk having errors
+     * @throws DiskOutOfSpaceException
+     *             If disk is full or having less space than threshhold
+     */
+    public void checkDir(File dir) throws DiskErrorException,
+            DiskOutOfSpaceException {
+        checkDiskFull(dir);
+        if (!mkdirsWithExistsCheck(dir))
+            throw new DiskErrorException("can not create directory: "
+                    + dir.toString());
+
+        if (!dir.isDirectory())
+            throw new DiskErrorException("not a directory: " + dir.toString());
+
+        if (!dir.canRead())
+            throw new DiskErrorException("directory is not readable: "
+                    + dir.toString());
+
+        if (!dir.canWrite())
+            throw new DiskErrorException("directory is not writable: "
+                    + dir.toString());
+    }
+
+    /**
+     * Returns the disk space threshold.
+     * 
+     * @return
+     */
+    // VisibleForTesting
+    float getDiskSpaceThreshold() {
+        return diskUsageThreshold;
+    }
+
+    /**
+     * Set the disk space threshold
+     * 
+     * @param diskSpaceThreshold
+     */
+    // VisibleForTesting
+    void setDiskSpaceThreshold(float diskSpaceThreshold) {
+        validateThreshold(diskSpaceThreshold);
+        this.diskUsageThreshold = diskSpaceThreshold;
+    }
+
+    private void validateThreshold(float diskSpaceThreshold) {
+        if (diskSpaceThreshold <= 0 || diskSpaceThreshold >= 1) {
+            throw new IllegalArgumentException("Disk space threashold "
+                    + diskSpaceThreshold
+                    + " is not valid. Should be > 0 and < 1 ");
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Thu Oct 18 14:40:30 2012
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.bookie.Garb
 import org.apache.bookkeeper.bookie.GarbageCollectorThread.ExtractionScanner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -57,8 +58,9 @@ public class EntryLogTest extends TestCa
         ServerConfiguration conf = new ServerConfiguration();
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+        Bookie bookie = new Bookie(conf);
         // create some entries
-        EntryLogger logger = new EntryLogger(conf);
+        EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
         logger.addEntry(1, generateEntry(1, 1));
         logger.addEntry(3, generateEntry(3, 1));
         logger.addEntry(2, generateEntry(2, 1));
@@ -69,7 +71,7 @@ public class EntryLogTest extends TestCa
         raf.setLength(raf.length()-10);
         raf.close();
         // now see which ledgers are in the log
-        logger = new EntryLogger(conf);
+        logger = new EntryLogger(conf, bookie.getLedgerDirsManager());
 
         EntryLogMetadata meta = new EntryLogMetadata(0L);
         ExtractionScanner scanner = new ExtractionScanner(meta);
@@ -87,10 +89,11 @@ public class EntryLogTest extends TestCa
     }
 
     private ByteBuffer generateEntry(long ledger, long entry) {
-        ByteBuffer bb = ByteBuffer.wrap(new byte[64]);
+        byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+        ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
         bb.putLong(ledger);
         bb.putLong(entry);
-        bb.put(("ledger-" + ledger + "-" + entry).getBytes());
+        bb.put(data);
         bb.flip();
         return bb;
     }
@@ -105,6 +108,7 @@ public class EntryLogTest extends TestCa
 
         ServerConfiguration conf = new ServerConfiguration();
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+        Bookie bookie = new Bookie(conf);
         // create some entries
         int numLogs = 3;
         int numEntries = 10;
@@ -112,7 +116,8 @@ public class EntryLogTest extends TestCa
         for (int i=0; i<numLogs; i++) {
             positions[i] = new long[numEntries];
 
-            EntryLogger logger = new EntryLogger(conf);
+            EntryLogger logger = new EntryLogger(conf,
+                    bookie.getLedgerDirsManager());
             for (int j=0; j<numEntries; j++) {
                 positions[i][j] = logger.addEntry(i, generateEntry(i, j));
             }
@@ -126,14 +131,16 @@ public class EntryLogTest extends TestCa
         for (int i=numLogs; i<2*numLogs; i++) {
             positions[i] = new long[numEntries];
 
-            EntryLogger logger = new EntryLogger(conf);
+            EntryLogger logger = new EntryLogger(conf,
+                    bookie.getLedgerDirsManager());
             for (int j=0; j<numEntries; j++) {
                 positions[i][j] = logger.addEntry(i, generateEntry(i, j));
             }
             logger.flush();
         }
 
-        EntryLogger newLogger = new EntryLogger(conf);
+        EntryLogger newLogger = new EntryLogger(conf,
+                bookie.getLedgerDirsManager());
         for (int i=0; i<(2*numLogs+1); i++) {
             File logFile = new File(curDir, Long.toHexString(i) + ".log");
             assertTrue(logFile.exists());
@@ -164,7 +171,7 @@ public class EntryLogTest extends TestCa
         conf.setLedgerDirNames(new String[] { tmpDir.toString() });
         EntryLogger entryLogger = null;
         try {
-            entryLogger = new EntryLogger(conf);
+            entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf));
             fail("Expecting FileNotFoundException");
         } catch (FileNotFoundException e) {
             assertEquals("Entry log directory does not exist", e
@@ -176,6 +183,42 @@ public class EntryLogTest extends TestCa
         }
     }
 
+    /**
+     * Test to verify the DiskFull during addEntry
+     */
+    @Test
+    public void testAddEntryFailureOnDiskFull() throws Exception {
+        File ledgerDir1 = File.createTempFile("bkTest", ".dir");
+        ledgerDir1.delete();
+        File ledgerDir2 = File.createTempFile("bkTest", ".dir");
+        ledgerDir2.delete();
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
+                ledgerDir2.getAbsolutePath() });
+        Bookie bookie = new Bookie(conf);
+        EntryLogger entryLogger = new EntryLogger(conf,
+                bookie.getLedgerDirsManager());
+        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        ledgerStorage.entryLogger = entryLogger;
+        // Create ledgers
+        ledgerStorage.setMasterKey(1, "key".getBytes());
+        ledgerStorage.setMasterKey(2, "key".getBytes());
+        ledgerStorage.setMasterKey(3, "key".getBytes());
+        // Add entries
+        ledgerStorage.addEntry(generateEntry(1, 1));
+        ledgerStorage.addEntry(generateEntry(2, 1));
+        // Add entry with disk full failure simulation
+        bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
+        ledgerStorage.addEntry(generateEntry(3, 1));
+        // Verify written entries
+        Assert.assertArrayEquals(generateEntry(1, 1).array(), ledgerStorage
+                .getEntry(1, 1).array());
+        Assert.assertArrayEquals(generateEntry(2, 1).array(), ledgerStorage
+                .getEntry(2, 1).array());
+        Assert.assertArrayEquals(generateEntry(3, 1).array(), ledgerStorage
+                .getEntry(3, 1).array());
+    }
+
     @After
     public void tearDown() throws Exception {
     }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,166 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.test;
+
+import java.io.File;
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Test to verify the readonly feature of bookies
+ */
+public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
+
+    public ReadOnlyBookieTest() {
+        super(2);
+    }
+
+    /**
+     * Check readonly bookie
+     */
+    public void testBookieShouldServeAsReadOnly() throws Exception {
+        killBookie(0);
+        baseConf.setReadOnlyModeEnabled(true);
+        startNewBookie();
+        LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+                "".getBytes());
+
+        // Check new bookie with readonly mode enabled.
+        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        assertEquals("Only one ledger dir should be present", 1,
+                ledgerDirs.length);
+        Bookie bookie = bs.get(1).getBookie();
+        LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry("data".getBytes());
+        }
+
+        // Now add the current ledger dir to filled dirs list
+        ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
+
+        try {
+            ledger.addEntry("data".getBytes());
+        } catch (BKException.BKNotEnoughBookiesException e) {
+            // Expected
+        }
+
+        assertTrue("Bookie should be running and converted to readonly mode",
+                bookie.isRunning() && bookie.isReadOnly());
+
+        // Now kill the other bookie and read entries from the readonly bookie
+        killBookie(0);
+
+        Enumeration<LedgerEntry> readEntries = ledger.readEntries(0, 9);
+        while (readEntries.hasMoreElements()) {
+            LedgerEntry entry = readEntries.nextElement();
+            assertEquals("Entry should contain correct data", "data",
+                    new String(entry.getEntry()));
+        }
+    }
+
+    /**
+     * check readOnlyModeEnabled=false
+     */
+    public void testBookieShutdownIfReadOnlyModeNotEnabled() throws Exception {
+        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        assertEquals("Only one ledger dir should be present", 1,
+                ledgerDirs.length);
+        Bookie bookie = bs.get(1).getBookie();
+        LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+                "".getBytes());
+        LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry("data".getBytes());
+        }
+
+        // Now add the current ledger dir to filled dirs list
+        ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
+
+        try {
+            ledger.addEntry("data".getBytes());
+        } catch (BKException.BKNotEnoughBookiesException e) {
+            // Expected
+        }
+
+        assertFalse("Bookie should shutdown if readOnlyMode not enabled",
+                bookie.isAlive());
+    }
+
+    /**
+     * Check multiple ledger dirs
+     */
+    public void testBookieContinueWritingIfMultipleLedgersPresent()
+            throws Exception {
+        startNewBookieWithMultipleLedgerDirs(2);
+
+        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        assertEquals("Only one ledger dir should be present", 2,
+                ledgerDirs.length);
+        Bookie bookie = bs.get(1).getBookie();
+        LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+                "".getBytes());
+        LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry("data".getBytes());
+        }
+
+        // Now add the current ledger dir to filled dirs list
+        ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry("data".getBytes());
+        }
+        assertEquals("writable dirs should have one dir", 1, ledgerDirsManager
+                .getWritableLedgerDirs().size());
+        assertTrue("Bookie should shutdown if readOnlyMode not enabled",
+                bookie.isAlive());
+    }
+
+    private void startNewBookieWithMultipleLedgerDirs(int numOfLedgerDirs)
+            throws Exception {
+        ServerConfiguration conf = bsConfs.get(1);
+        killBookie(1);
+
+        File[] ledgerDirs = new File[numOfLedgerDirs];
+        for (int i = 0; i < numOfLedgerDirs; i++) {
+            File dir = File.createTempFile("bookie", "test");
+            tmpDirs.add(dir);
+            dir.delete();
+            dir.mkdir();
+            ledgerDirs[i] = dir;
+        }
+
+        ServerConfiguration newConf = newServerConfiguration(
+                conf.getBookiePort() + 1, zkUtil.getZooKeeperConnectString(),
+                ledgerDirs[0], ledgerDirs);
+        bsConfs.add(newConf);
+        bs.add(startBookie(newConf));
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
+import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to verify {@link DiskChecker}
+ * 
+ */
+public class TestDiskChecker {
+
+    DiskChecker diskChecker;
+
+    @Before
+    public void setup() {
+        diskChecker = new DiskChecker(0.95f);
+    }
+
+    /**
+     * Check the disk full
+     */
+    @Test(expected = DiskOutOfSpaceException.class)
+    public void testCheckDiskFull() throws IOException {
+        File file = File.createTempFile("DiskCheck", "test");
+        long usableSpace = file.getUsableSpace();
+        long totalSpace = file.getTotalSpace();
+        diskChecker
+                .setDiskSpaceThreshold((1f - ((float) usableSpace / (float) totalSpace)) - 0.05f);
+        diskChecker.checkDiskFull(file);
+    }
+
+    /**
+     * Check disk full on non exist file. in this case it should check for
+     * parent file
+     */
+    @Test(expected = DiskOutOfSpaceException.class)
+    public void testCheckDiskFullOnNonExistFile() throws IOException {
+        File file = File.createTempFile("DiskCheck", "test");
+        long usableSpace = file.getUsableSpace();
+        long totalSpace = file.getTotalSpace();
+        diskChecker
+                .setDiskSpaceThreshold((1f - ((float) usableSpace / (float) totalSpace)) - 0.05f);
+        assertTrue(file.delete());
+        diskChecker.checkDiskFull(file);
+    }
+
+    /**
+     * Check disk error for file
+     */
+    @Test(expected = DiskErrorException.class)
+    public void testCheckDiskErrorForFile() throws Exception {
+        File parent = File.createTempFile("DiskCheck", "test");
+        parent.delete();
+        parent.mkdir();
+        File child = File.createTempFile("DiskCheck", "test", parent);
+        diskChecker.checkDir(child);
+    }
+
+    /**
+     * Check disk error for valid dir.
+     */
+    @Test
+    public void testCheckDiskErrorForDir() throws Exception {
+        File parent = File.createTempFile("DiskCheck", "test");
+        parent.delete();
+        parent.mkdir();
+        File child = File.createTempFile("DiskCheck", "test", parent);
+        child.delete();
+        child.mkdir();
+        diskChecker.checkDir(child);
+    }
+}