You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/11/29 23:02:32 UTC

bookkeeper git commit: BOOKKEEPER-948: Provide an option to add more ledger/index directorie…

Repository: bookkeeper
Updated Branches:
  refs/heads/master 95ea4815b -> 66515e74d


BOOKKEEPER-948: Provide an option to add more ledger/index directorie\u2026

\u2026s to a bookie

This change allows the addition of new ledger and index directories to a bookie. Thus
increasing ts storage capacity. The option is exposed via 'allowStorageExpansion'
boolean configuration option. Also, the newly added directories need to be empty to be
accepted. Two new test cases have been added to test this functionality.

Author: Rithin <ri...@salesforce.com>

Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>

Closes #59 from rithin-shetty/storage_expansion


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/66515e74
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/66515e74
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/66515e74

Branch: refs/heads/master
Commit: 66515e74d3fb03d21cad4eccb446a90ab7e94597
Parents: 95ea481
Author: Rithin <ri...@salesforce.com>
Authored: Tue Nov 29 15:02:27 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Nov 29 15:02:27 2016 -0800

----------------------------------------------------------------------
 bookkeeper-server/conf/bk_server.conf           |   4 +
 .../org/apache/bookkeeper/bookie/Bookie.java    | 126 +++++++++++++++-
 .../apache/bookkeeper/bookie/BookieShell.java   |  61 ++++++++
 .../org/apache/bookkeeper/bookie/Cookie.java    |  66 +++++++--
 .../bookkeeper/conf/ServerConfiguration.java    |  23 +++
 .../apache/bookkeeper/bookie/CookieTest.java    | 145 ++++++++++++++++++-
 6 files changed, 406 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/66515e74/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index 678018c..14c6068 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -57,6 +57,10 @@ ledgerDirectories=/tmp/bk-data
 # Directories to store index files. If not specified, will use ledgerDirectories to store.
 # indexDirectories=/tmp/bk-data
 
+# Allow the expansion of bookie storage capacity. Newly added ledger
+# and index dirs must be empty.
+# allowStorageExpansion=false
+
 # Ledger Manager Class
 # What kind of ledger manager is used to manage how ledgers are stored, managed
 # and garbage collected. Try to read 'BookKeeper Internals' for detail info.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/66515e74/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 6200d21..82db3b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -84,6 +84,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
@@ -323,10 +324,14 @@ public class Bookie extends BookieCriticalThread {
         if (zk == null) { // exists only for testing, just make sure directories are correct
             checkDirectoryStructure(journalDirectory);
             for (File dir : allLedgerDirs) {
-                    checkDirectoryStructure(dir);
+                checkDirectoryStructure(dir);
             }
             return;
         }
+        if (conf.getAllowStorageExpansion()) {
+            checkEnvironmentWithStorageExpansion(conf, zk, journalDirectory, allLedgerDirs);
+            return;
+        }
         try {
             boolean newEnv = false;
             List<File> missedCookieDirs = new ArrayList<File>();
@@ -343,7 +348,7 @@ public class Bookie extends BookieCriticalThread {
                 newEnv = true;
                 missedCookieDirs.add(journalDirectory);
             }
-            String instanceId = getInstanceId(zk);
+            String instanceId = getInstanceId(conf, zk);
             Cookie.Builder builder = Cookie.generateCookie(conf);
             if (null != instanceId) {
                 builder.setInstanceId(instanceId);
@@ -360,26 +365,27 @@ public class Bookie extends BookieCriticalThread {
             checkDirectoryStructure(journalDirectory);
 
             if(!newEnv){
-                journalCookie.verify(masterCookie);
+                masterCookie.verify(journalCookie);
             }
             for (File dir : allLedgerDirs) {
                 checkDirectoryStructure(dir);
                 try {
                     Cookie c = Cookie.readFromDirectory(dir);
-                    c.verify(masterCookie);
+                    masterCookie.verify(c);
                 } catch (FileNotFoundException fnf) {
                     missedCookieDirs.add(dir);
                 }
             }
 
-            if (!newEnv && missedCookieDirs.size() > 0){
+            if (!newEnv && missedCookieDirs.size() > 0) {
                 LOG.error("Cookie exists in zookeeper, but not in all local directories. "
-                        + " Directories missing cookie file are " + missedCookieDirs);
+                          + " Directories missing cookie file are " + missedCookieDirs);
                 throw new BookieException.InvalidCookieException();
             }
+
             if (newEnv) {
                 if (missedCookieDirs.size() > 0) {
-                    LOG.debug("Directories missing cookie file are {}", missedCookieDirs);
+                    LOG.info("Directories missing cookie file are {}", missedCookieDirs);
                     masterCookie.writeToDirectory(journalDirectory);
                     for (File dir : allLedgerDirs) {
                         masterCookie.writeToDirectory(dir);
@@ -402,6 +408,110 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
+    public static void checkEnvironmentWithStorageExpansion(ServerConfiguration conf,
+            ZooKeeper zk, File journalDirectory, List<File> allLedgerDirs) throws BookieException, IOException {
+        try {
+            boolean newEnv = false;
+            List<File> missedCookieDirs = new ArrayList<File>();
+            Cookie journalCookie = null;
+            // try to read cookie from journal directory.
+            try {
+                journalCookie = Cookie.readFromDirectory(journalDirectory);
+                if (journalCookie.isBookieHostCreatedFromIp()) {
+                    conf.setUseHostNameAsBookieID(false);
+                } else {
+                    conf.setUseHostNameAsBookieID(true);
+                }
+            } catch (FileNotFoundException fnf) {
+                newEnv = true;
+                missedCookieDirs.add(journalDirectory);
+            }
+            String instanceId = getInstanceId(conf, zk);
+            Cookie.Builder builder = Cookie.generateCookie(conf);
+            if (null != instanceId) {
+                builder.setInstanceId(instanceId);
+            }
+            Cookie masterCookie = builder.build();
+            Versioned<Cookie> zkCookie = null;
+            try {
+                zkCookie = Cookie.readFromZooKeeper(zk, conf);
+                // If allowStorageExpansion option is set, we should 
+                // make sure that the new set of ledger/index dirs
+                // is a super set of the old; else, we fail the cookie check
+                masterCookie.verifyIsSuperSet(zkCookie.getValue());
+            } catch (KeeperException.NoNodeException nne) {
+                // can occur in cases:
+                // 1) new environment or
+                // 2) done only metadata format and started bookie server.
+            }
+            checkDirectoryStructure(journalDirectory);
+
+            if(!newEnv){
+                masterCookie.verifyIsSuperSet(journalCookie);
+            }
+
+            for (File dir : allLedgerDirs) {
+                checkDirectoryStructure(dir);
+                try {
+                    Cookie c = Cookie.readFromDirectory(dir);
+                    masterCookie.verifyIsSuperSet(c);
+                } catch (FileNotFoundException fnf) {
+                    missedCookieDirs.add(dir);
+                }
+            }
+
+            if (!newEnv && missedCookieDirs.size() > 0) {
+                // If we find that any of the dirs in missedCookieDirs, existed
+                // previously, we stop because we could be missing data
+                // Also, if a new ledger dir is being added, we make sure that
+                // that dir is empty. Else, we reject the request
+                Set<String> existingLedgerDirs = Sets.newHashSet(journalCookie.getLedgerDirPathsFromCookie());
+                List<File> dirsMissingData = new ArrayList<File>();
+                List<File> nonEmptyDirs = new ArrayList<File>();
+                for (File dir : missedCookieDirs) {
+                    if (existingLedgerDirs.contains(dir.getParent())) {
+                        // if one of the existing ledger dirs doesn't have cookie,
+                        // let us not proceed further
+                        dirsMissingData.add(dir);
+                        continue;
+                    }
+                    String[] content = dir.list();
+                    if (content != null && content.length != 0) {
+                        nonEmptyDirs.add(dir);
+                    }
+                }
+                if (dirsMissingData.size() > 0 || nonEmptyDirs.size() > 0) {
+                    LOG.error("Either not all local directories have cookies or directories being added "
+                            + " newly are not empty. "
+                            + "Directories missing cookie file are: " + dirsMissingData
+                            + " New directories that are not empty are: " + nonEmptyDirs);
+                    throw new BookieException.InvalidCookieException();
+                }
+            }
+
+            if (missedCookieDirs.size() > 0) {
+                LOG.info("Stamping new cookies on all dirs {}", missedCookieDirs);
+                masterCookie.writeToDirectory(journalDirectory);
+                for (File dir : allLedgerDirs) {
+                    masterCookie.writeToDirectory(dir);
+                }
+                masterCookie.writeToZooKeeper(zk, conf, zkCookie != null ? zkCookie.getVersion() : Version.NEW);
+            }
+        } catch (KeeperException ke) {
+            LOG.error("Couldn't access cookie in zookeeper", ke);
+            throw new BookieException.InvalidCookieException(ke);
+        } catch (UnknownHostException uhe) {
+            LOG.error("Couldn't check cookies, networking is broken", uhe);
+            throw new BookieException.InvalidCookieException(uhe);
+        } catch (IOException ioe) {
+            LOG.error("Error accessing cookie on disks", ioe);
+            throw new BookieException.InvalidCookieException(ioe);
+        } catch (InterruptedException ie) {
+            LOG.error("Thread interrupted while checking cookies, exiting", ie);
+            throw new BookieException.InvalidCookieException(ie);
+        }
+    }
+
     /**
      * Return the configured address of the bookie.
      */
@@ -427,7 +537,7 @@ public class Bookie extends BookieCriticalThread {
         return addr;
     }
 
-    private String getInstanceId(ZooKeeper zk) throws KeeperException,
+    private static String getInstanceId(ServerConfiguration conf, ZooKeeper zk) throws KeeperException,
             InterruptedException {
         String instanceId = null;
         if (zk.exists(conf.getZkLedgersRootPath(), null) == null) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/66515e74/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index df31bf2..7d63b62 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.Arrays;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -79,6 +80,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
 
 /**
@@ -107,6 +109,7 @@ public class BookieShell implements Tool {
     static final String CMD_AUTORECOVERY = "autorecovery";
     static final String CMD_LISTBOOKIES = "listbookies";
     static final String CMD_UPDATECOOKIE = "updatecookie";
+    static final String CMD_EXPANDSTORAGE = "expandstorage";
     static final String CMD_UPDATELEDGER = "updateledgers";
     static final String CMD_HELP = "help";
 
@@ -1386,6 +1389,63 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Expand the storage directories owned by a bookie
+     */
+    class ExpandStorageCmd extends MyCommand {
+        Options opts = new Options();
+
+        ExpandStorageCmd() {
+            super(CMD_EXPANDSTORAGE);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Add new empty ledger/index directories. Update the directories"
+                   + "info in the conf file before running the command.";
+        }
+
+        @Override
+        String getUsage() {
+            return "expandstorage";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) {
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            ZooKeeper zk;
+            try {
+                zk = ZooKeeperClient.newBuilder()
+                        .connectString(bkConf.getZkServers())
+                        .sessionTimeoutMs(bkConf.getZkTimeout()).build();
+            } catch (KeeperException | InterruptedException | IOException e) {
+                LOG.error("Exception while establishing zookeeper connection.", e);
+                return -1;
+            }
+
+            List<File> allLedgerDirs = Lists.newArrayList();
+            allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
+            if (indexDirectories != ledgerDirectories) {
+                allLedgerDirs.addAll(Arrays.asList(indexDirectories));
+            }
+
+            try {
+                Bookie.checkEnvironmentWithStorageExpansion(conf, zk,
+                        journalDirectory, allLedgerDirs);
+            } catch (BookieException | IOException e) {
+                LOG.error(
+                        "Exception while updating cookie for storage expansion", e);
+                return -1;
+            }
+            return 0;
+        }
+    }
+
+    /**
      * Update ledger command
      */
     class UpdateLedgerCmd extends MyCommand {
@@ -1519,6 +1579,7 @@ public class BookieShell implements Tool {
         commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd());
         commands.put(CMD_LISTBOOKIES, new ListBookiesCmd());
         commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd());
+        commands.put(CMD_EXPANDSTORAGE, new ExpandStorageCmd());
         commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd());
         commands.put(CMD_HELP, new HelpCmd());
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/66515e74/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
index 349eee5..1730cd9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
@@ -34,6 +34,7 @@ import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Set;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -51,6 +52,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
 import com.google.protobuf.TextFormat;
 
 /**
@@ -75,6 +77,7 @@ class Cookie {
     private final String journalDir;
     private final String ledgerDirs;
     private final String instanceId;
+    private static final String SEPARATOR = "\t";
 
     private Cookie(int layoutVersion, String bookieHost, String journalDir, String ledgerDirs, String instanceId) {
         this.layoutVersion = layoutVersion;
@@ -84,15 +87,56 @@ class Cookie {
         this.instanceId = instanceId;
     }
 
-    public void verify(Cookie c) throws BookieException.InvalidCookieException {
+    private static String encodeDirPaths(String[] dirs) {
+        StringBuilder b = new StringBuilder();
+        b.append(dirs.length);
+        for (String d : dirs) {
+            b.append(SEPARATOR).append(d);
+        }
+        return b.toString();
+    }
+
+    private static String[] decodeDirPathFromCookie(String s) {
+        // the first part of the string contains a count of how many
+        // directories are present; to skip it, we look for subString
+        // from the first '/'
+        return s.substring(s.indexOf(SEPARATOR)+SEPARATOR.length()).split(SEPARATOR);
+    }
+
+    String[] getLedgerDirPathsFromCookie() {
+        return decodeDirPathFromCookie(ledgerDirs);
+    }
+
+    /**
+     * Receives 2 String arrays, that each contain a list of directory paths,
+     * and checks if first is a super set of the second.
+     *
+     * @param superSet
+     * @param subSet
+     * @return true if s1 is a superSet of s2; false otherwise
+     */
+    private boolean isSuperSet(String[] s1, String[] s2) {
+        Set<String> superSet = Sets.newHashSet(s1);
+        Set<String> subSet = Sets.newHashSet(s2);
+        return superSet.containsAll(subSet);
+    }
+
+    private boolean verifyLedgerDirs(Cookie c, boolean checkIfSuperSet) {
+        if (checkIfSuperSet == false) {
+            return ledgerDirs.equals(c.ledgerDirs);
+        } else {
+            return isSuperSet(decodeDirPathFromCookie(ledgerDirs), decodeDirPathFromCookie(c.ledgerDirs));
+        }
+    }
+
+    private void verifyInternal(Cookie c, boolean checkIfSuperSet) throws BookieException.InvalidCookieException {
         String errMsg;
         if (c.layoutVersion < 3 && c.layoutVersion != layoutVersion) {
             errMsg = "Cookie is of too old version " + c.layoutVersion;
             LOG.error(errMsg);
             throw new BookieException.InvalidCookieException(errMsg);
         } else if (!(c.layoutVersion >= 3 && c.bookieHost.equals(bookieHost)
-                && c.journalDir.equals(journalDir) && c.ledgerDirs
-                    .equals(ledgerDirs))) {
+            && c.journalDir.equals(journalDir) && verifyLedgerDirs(c, checkIfSuperSet))) {
             errMsg = "Cookie [" + this + "] is not matching with [" + c + "]";
             throw new BookieException.InvalidCookieException(errMsg);
         } else if ((instanceId == null && c.instanceId != null)
@@ -104,6 +148,14 @@ class Cookie {
         }
     }
 
+    public void verify(Cookie c) throws BookieException.InvalidCookieException {
+        verifyInternal(c, false);
+    }
+
+    public void verifyIsSuperSet(Cookie c) throws BookieException.InvalidCookieException {
+        verifyInternal(c, true);
+    }
+
     public String toString() {
         if (layoutVersion <= 3) {
             return toStringVersion3();
@@ -275,17 +327,11 @@ class Cookie {
      */
     static Builder generateCookie(ServerConfiguration conf)
             throws UnknownHostException {
-        StringBuilder b = new StringBuilder();
-        String[] dirs = conf.getLedgerDirNames();
-        b.append(dirs.length);
-        for (String d : dirs) {
-            b.append("\t").append(d);
-        }
         Builder builder = Cookie.newBuilder();
         builder.setLayoutVersion(CURRENT_COOKIE_LAYOUT_VERSION);
         builder.setBookieHost(Bookie.getBookieAddress(conf).toString());
         builder.setJournalDir(conf.getJournalDirName());
-        builder.setLedgerDirs(b.toString());
+        builder.setLedgerDirs(encodeDirPaths(conf.getLedgerDirNames()));
         return builder;
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/66515e74/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 92644da..6bc118a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -84,6 +84,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     protected final static String JOURNAL_DIR = "journalDirectory";
     protected final static String LEDGER_DIRS = "ledgerDirectories";
     protected final static String INDEX_DIRS = "indexDirectories";
+    protected final static String ALLOW_STORAGE_EXPANSION = "allowStorageExpansion";
     // NIO Parameters
     protected final static String SERVER_TCP_NODELAY = "serverTcpNoDelay";
     // Zookeeper Parameters
@@ -521,6 +522,28 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Return whether we should allow addition of ledger/index dirs to an existing bookie.
+     *
+     * @return true if the addition is allowed; false otherwise
+     */
+    public boolean getAllowStorageExpansion() {
+        return this.getBoolean(ALLOW_STORAGE_EXPANSION, false);
+    }
+
+    /**
+     * Change the setting of whether or not we should allow ledger/index
+     * dirs to be added to the current set of dirs.
+     *
+     * @param val - true if new ledger/index dirs can be added; false otherwise.
+     *
+     * @return server configuration
+     */
+    public ServerConfiguration setAllowStorageExpansion(boolean val) {
+        this.setProperty(ALLOW_STORAGE_EXPANSION, val);
+        return this;
+    }
+
+    /**
      * Get dir name to store journal files
      *
      * @return journal dir name

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/66515e74/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
index 34104d3..2288b79 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
@@ -34,9 +34,11 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.junit.Assert;
-
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -47,6 +49,9 @@ import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.io.FileUtils;
 import org.junit.Test;
 
+
+import com.google.common.collect.Sets;
+
 public class CookieTest extends BookKeeperClusterTestCase {
     final int bookiePort = PortManager.nextFreePort();
 
@@ -195,6 +200,144 @@ public class CookieTest extends BookKeeperClusterTestCase {
     }
 
     /**
+     * Test that if a directory is added to an existing bookie, and
+     * allowStorageExpansion option is true, the bookie should come online.
+     */
+    @Test(timeout=60000)
+    public void testStorageExpansionOption() throws Exception {
+        String ledgerDir0 = newDirectory();
+        String indexDir0 = newDirectory();
+        String journalDir = newDirectory();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setJournalDirName(journalDir)
+            .setLedgerDirNames(new String[] { ledgerDir0 })
+            .setIndexDirName(new String[] { indexDir0 })
+            .setBookiePort(bookiePort)
+            .setAllowStorageExpansion(true);
+
+        Bookie b = new Bookie(conf); // should work fine
+        b.start();
+        b.shutdown();
+        b = null;
+
+        // add a few additional ledger dirs
+        String[] lPaths = new String[] {ledgerDir0, newDirectory(), newDirectory()};
+        Set<String> configuredLedgerDirs =  Sets.newHashSet(lPaths);
+        conf.setLedgerDirNames(lPaths);
+
+        // add an extra index dir
+        String[] iPaths = new String[] {indexDir0, newDirectory()};
+        Set<String> configuredIndexDirs =  Sets.newHashSet(iPaths);
+        conf.setIndexDirName(iPaths);
+
+        try {
+            b = new Bookie(conf);
+        } catch (BookieException.InvalidCookieException ice) {
+            fail("Should have been able to start the bookie");
+        }
+
+        List<File> l = b.getLedgerDirsManager().getAllLedgerDirs();
+        HashSet<String> bookieLedgerDirs = Sets.newHashSet();
+        for (File f : l) {
+            // Using the parent path because the bookie creates a 'current'
+            // dir under the ledger dir user provides
+            bookieLedgerDirs.add(f.getParent());
+        }
+        assertTrue("Configured ledger dirs: " + configuredLedgerDirs + " doesn't match bookie's ledger dirs: "
+                   + bookieLedgerDirs,
+                   configuredLedgerDirs.equals(bookieLedgerDirs));
+
+        l = b.getIndexDirsManager().getAllLedgerDirs();
+        HashSet<String> bookieIndexDirs = Sets.newHashSet();
+        for (File f : l) {
+            bookieIndexDirs.add(f.getParent());
+        }
+        assertTrue("Configured Index dirs: " + configuredIndexDirs + " doesn't match bookie's index dirs: "
+                   + bookieIndexDirs,
+                   configuredIndexDirs.equals(bookieIndexDirs));
+
+        b.shutdown();
+
+        // Make sure that substituting an older ledger directory
+        // is not allowed.
+        String[] lPaths2 = new String[] { lPaths[0], lPaths[1], newDirectory() };
+        conf.setLedgerDirNames(lPaths2);
+        try {
+            b = new Bookie(conf);
+            fail("Should not have been able to start the bookie");
+        } catch (BookieException.InvalidCookieException ice) {
+            // correct behavior
+        }
+
+        // Finally make sure that not including the older ledger directories
+        // is not allowed. Remove one of the older ledger dirs
+        lPaths2 = new String[] { lPaths[0], lPaths[1] };
+        conf.setLedgerDirNames(lPaths2);
+        try {
+            b = new Bookie(conf);
+            fail("Should not have been able to start the bookie");
+        } catch (BookieException.InvalidCookieException ice) {
+            // correct behavior
+        }
+    }
+
+    /**
+     * Test that adding of a non-empty directory is not allowed
+     * even when allowStorageExpansion option is true
+     */
+    @Test(timeout=60000)
+    public void testNonEmptyDirAddWithStorageExpansionOption() throws Exception {
+        String ledgerDir0 = newDirectory();
+        String indexDir0 = newDirectory();
+        String journalDir = newDirectory();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setJournalDirName(journalDir)
+            .setLedgerDirNames(new String[] { ledgerDir0 })
+            .setIndexDirName(new String[] { indexDir0 })
+            .setBookiePort(bookiePort)
+            .setAllowStorageExpansion(true);
+
+        Bookie b = new Bookie(conf); // should work fine
+        b.start();
+        b.shutdown();
+        b = null;
+
+        // add an additional ledger dir
+        String[] lPaths = new String[] {ledgerDir0, newDirectory()};
+        conf.setLedgerDirNames(lPaths);
+
+        // create a file to make the dir non-empty
+        File currentDir = Bookie.getCurrentDirectory(new File(lPaths[1]));
+        new File(currentDir, "foo").createNewFile();
+        assertTrue(currentDir.list().length == 1);
+
+        try {
+            b = new Bookie(conf);
+            fail("Shouldn't have been able to start");
+        } catch (BookieException.InvalidCookieException ice) {
+            // correct behavior
+        }
+
+        // Now test with a non-empty index dir
+        String[] iPaths = new String[] {indexDir0, newDirectory()};
+        conf.setIndexDirName(iPaths);
+
+        // create a dir to make it non-empty
+        currentDir = Bookie.getCurrentDirectory(new File(iPaths[1]));
+        new File(currentDir, "bar").mkdirs();
+        assertTrue(currentDir.list().length == 1);
+
+        try {
+            b = new Bookie(conf);
+            fail("Shouldn't have been able to start");
+        } catch (BookieException.InvalidCookieException ice) {
+            // correct behavior
+        }
+    }
+
+    /**
      * Test that if a directory's contents
      * are emptied, the bookie will fail to start
      */