You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/10/27 23:13:18 UTC

[bookkeeper] branch master updated: ISSUE #265: Add persistable bookie status

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a20fe0  ISSUE #265: Add persistable bookie status
3a20fe0 is described below

commit 3a20fe0ab6c0259ae9143641105c46e48f625f7c
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Oct 27 16:13:10 2017 -0700

    ISSUE #265: Add persistable bookie status
    
    Descriptions of the changes in this PR:
    
    - Add support for persisting bookie status
    - Add configuration to enable/disable this feature
    - Add test cases
    - Improve bookie status stat
    
    It also includes changes for BOOKKEEPER-754 (flush ledger storage after replaying journal)
    
    Author: Sijie Guo <si...@apache.org>
    Author: Yiming Zang <yz...@twitter.com>
    
    Reviewers: Yiming Zang <yz...@gmail.com>, Enrico Olivelli <eo...@gmail.com>
    
    This closes #266 from sijie/server_side_crc32, closes #265
---
 bookkeeper-server/conf/bk_server.conf              |   4 +
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  60 ++++-
 .../org/apache/bookkeeper/bookie/BookieStatus.java | 242 +++++++++++++++++++++
 .../apache/bookkeeper/bookie/ReadOnlyBookie.java   |   4 +-
 .../org/apache/bookkeeper/bookie/SyncThread.java   |  73 ++++---
 .../bookkeeper/conf/ServerConfiguration.java       |  25 +++
 .../org/apache/bookkeeper/proto/BookieServer.java  |  21 --
 .../java/org/apache/bookkeeper/server/Main.java    |   4 +-
 .../bookkeeper/util/BookKeeperConstants.java       |   1 +
 .../bookie/BookieInitializationTest.java           | 187 +++++++++++++++-
 site/_data/config/bk_server.yaml                   |   3 +-
 11 files changed, 561 insertions(+), 63 deletions(-)

diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index 98dcbcd..11cbff2 100755
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -485,6 +485,10 @@ ledgerManagerFactoryClass=org.apache.bookkeeper.meta.HierarchicalLedgerManagerFa
 # Whether the bookie is force started in read only mode or not
 # forceReadOnlyBookie=false
 
+# Persiste the bookie status locally on the disks. So the bookies can keep their status upon restarts
+# @Since 4.6
+# persistBookieStatusEnabled=false
+
 #############################################################################
 ## Disk utilization
 #############################################################################
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 953fb17..83888c1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -58,6 +58,7 @@ import java.util.Observable;
 import java.util.Observer;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -134,6 +135,8 @@ public class Bookie extends BookieCriticalThread {
     private volatile boolean running = false;
     // Flag identify whether it is in shutting down progress
     private volatile boolean shuttingdown = false;
+    // Bookie status
+    private final BookieStatus bookieStatus = new BookieStatus();
 
     private int exitCode = ExitCode.OK;
 
@@ -142,7 +145,7 @@ public class Bookie extends BookieCriticalThread {
     protected final String bookieId;
 
     private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
-    protected final AtomicBoolean readOnly = new AtomicBoolean(false);
+    protected final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
     // executor to manage the state changes for a bookie.
     final ExecutorService stateService = Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder().setNameFormat("BookieStateService-%d").build());
@@ -728,7 +731,13 @@ public class Bookie extends BookieCriticalThread {
 
             @Override
             public Number getSample() {
-                return rmRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
+                if (!rmRegistered.get()){
+                    return -1;
+                } else if (forceReadOnly.get() || bookieStatus.isInReadOnlyMode()) {
+                    return 0;
+                } else {
+                    return 1;
+                }
             }
         });
     }
@@ -812,6 +821,12 @@ public class Bookie extends BookieCriticalThread {
         if (indexDirsManager != ledgerDirsManager) {
             idxMonitor.start();
         }
+
+        // start sync thread first, so during replaying journals, we could do checkpoint
+        // which reduce the chance that we need to replay journals again if bookie restarted
+        // again before finished journal replays.
+        syncThread.start();
+
         // replay journals
         try {
             readJournal();
@@ -824,7 +839,19 @@ public class Bookie extends BookieCriticalThread {
             shutdown(ExitCode.BOOKIE_EXCEPTION);
             return;
         }
+
+        // Do a fully flush after journal replay
+        try {
+            syncThread.requestFlush().get();
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupting the fully flush after replaying journals : ", e);
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
+            LOG.error("Error on executing a fully flush after replaying journals.");
+            shutdown(ExitCode.BOOKIE_EXCEPTION);
+        }
         LOG.info("Finished reading journal, starting bookie");
+
         // start bookie thread
         super.start();
 
@@ -837,7 +864,13 @@ public class Bookie extends BookieCriticalThread {
 
         ledgerStorage.start();
 
-        syncThread.start();
+        // check the bookie status to start with
+        if (forceReadOnly.get()) {
+            this.bookieStatus.setToReadOnlyMode();
+        } else if (conf.isPersistBookieStatusEnabled()) {
+            this.bookieStatus.readFromDirectories(ledgerDirsManager.getAllLedgerDirs());
+        }
+
         // set running here.
         // since bookie server use running as a flag to tell bookie server whether it is alive
         // if setting it in bookie thread, the watcher might run before bookie thread.
@@ -942,7 +975,7 @@ public class Bookie extends BookieCriticalThread {
     }
 
     protected void doRegisterBookie() throws IOException {
-        doRegisterBookie(readOnly.get());
+        doRegisterBookie(forceReadOnly.get() || bookieStatus.isInReadOnlyMode());
     }
 
     private void doRegisterBookie(boolean isReadOnly) throws IOException {
@@ -979,13 +1012,18 @@ public class Bookie extends BookieCriticalThread {
 
     @VisibleForTesting
     public void doTransitionToWritableMode() {
-        if (shuttingdown) {
+        if (shuttingdown || forceReadOnly.get()) {
             return;
         }
-        if (!readOnly.compareAndSet(true, false)) {
+
+        if (!bookieStatus.setToWritableMode()) {
+            // do nothing if already in writable mode
             return;
         }
         LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
+        if (conf.isPersistBookieStatusEnabled()) {
+            bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
+        }
         // change zookeeper state only when using zookeeper
         if (null == registrationManager) {
             return;
@@ -1026,7 +1064,7 @@ public class Bookie extends BookieCriticalThread {
         if (shuttingdown) {
             return;
         }
-        if (!readOnly.compareAndSet(false, true)) {
+        if (!bookieStatus.setToReadOnlyMode()) {
             return;
         }
         if (!conf.isReadOnlyModeEnabled()) {
@@ -1039,6 +1077,10 @@ public class Bookie extends BookieCriticalThread {
         }
         LOG.info("Transitioning Bookie to ReadOnly mode,"
                 + " and will serve only read requests from clients!");
+        // persist the bookie status if we enable this
+        if (conf.isPersistBookieStatusEnabled()) {
+            this.bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
+        }
         // change zookeeper state only when using zookeeper
         if (null == registrationManager) {
             return;
@@ -1057,7 +1099,7 @@ public class Bookie extends BookieCriticalThread {
      * Check whether Bookie is writable
      */
     public boolean isReadOnly() {
-        return readOnly.get();
+        return forceReadOnly.get() || bookieStatus.isInReadOnlyMode();
     }
 
     public boolean isRunning() {
@@ -1133,7 +1175,7 @@ public class Bookie extends BookieCriticalThread {
 
                 // turn bookie to read only during shutting down process
                 LOG.info("Turning bookie to read only during shut down");
-                this.readOnly.set(true);
+                this.forceReadOnly.set(true);
 
                 // Shutdown Sync thread
                 syncThread.shutdown();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
new file mode 100644
index 0000000..560868a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.BookKeeperConstants.BOOKIE_STATUS_FILENAME;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The status object represents the current status of a bookie instance.
+ */
+public class BookieStatus {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BookieStatus.class);
+
+    static final int CURRENT_STATUS_LAYOUT_VERSION = 1;
+
+    enum BookieMode {
+        READ_ONLY,
+        READ_WRITE;
+    }
+
+    private final static long INVALID_UPDATE_TIME = -1;
+
+    private int layoutVersion;
+    private long lastUpdateTime;
+    private BookieMode bookieMode;
+
+
+    BookieStatus() {
+        this.bookieMode = BookieMode.READ_WRITE;
+        this.layoutVersion = CURRENT_STATUS_LAYOUT_VERSION;
+        this.lastUpdateTime = INVALID_UPDATE_TIME;
+    }
+
+    private synchronized BookieMode getBookieMode() {
+        return bookieMode;
+    }
+
+    public synchronized boolean isInWritable() {
+        return bookieMode.equals(BookieMode.READ_WRITE);
+    }
+
+    synchronized boolean setToWritableMode() {
+        if (!bookieMode.equals(BookieMode.READ_WRITE)) {
+            bookieMode = BookieMode.READ_WRITE;
+            this.lastUpdateTime = System.currentTimeMillis();
+            return true;
+        }
+        return false;
+    }
+
+    synchronized boolean isInReadOnlyMode() {
+        return bookieMode.equals(BookieMode.READ_ONLY);
+    }
+
+    synchronized boolean setToReadOnlyMode() {
+        if (!bookieMode.equals(BookieMode.READ_ONLY)) {
+            bookieMode = BookieMode.READ_ONLY;
+            this.lastUpdateTime = System.currentTimeMillis();
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Write bookie status to multiple directories in best effort
+     *
+     * @param directories list of directories to write to
+     *
+     */
+    synchronized void writeToDirectories(List<File> directories) {
+        boolean success = false;
+        for (File dir : directories) {
+            try {
+                File statusFile = new File(dir, BOOKIE_STATUS_FILENAME);
+                writeToFile(statusFile, toString());
+                success = true;
+            } catch (IOException e) {
+                LOG.warn("IOException while trying to write bookie status to directory {}." +
+                    " This is fine if not all directories are failed.", dir);
+            }
+        }
+        if(success){
+            LOG.info("Successfully persist bookie status {}", this.bookieMode);
+        } else {
+            LOG.warn("Failed to persist bookie status {}", this.bookieMode);
+        }
+    }
+
+    /**
+     * Write content to the file. If file does not exist, it will create one.
+     *
+     * @param file file that you want to write to
+     * @param body content to write
+     * @throws IOException
+     */
+    private static void writeToFile(File file, String body) throws IOException {
+        FileOutputStream fos = new FileOutputStream(file);
+        BufferedWriter bw = null;
+        try {
+            bw = new BufferedWriter(new OutputStreamWriter(fos, UTF_8));
+            bw.write(body);
+        } finally {
+            if (bw != null) {
+                bw.close();
+            }
+            fos.close();
+        }
+    }
+
+    /**
+     * Read bookie status from the status files, and update the bookie status if read succeed.
+     * If a status file is not readable or not found, it will skip and try to read from the next file.
+     *
+     * @param directories list of directories that store the status file
+     */
+    void readFromDirectories(List<File> directories) {
+        boolean success = false;
+        for (File dir : directories) {
+            File statusFile = new File(dir, BOOKIE_STATUS_FILENAME);
+            try {
+                BookieStatus status = readFromFile(statusFile);
+                if (null != status) {
+                    synchronized (status) {
+                        if (status.lastUpdateTime > this.lastUpdateTime) {
+                            this.lastUpdateTime = status.lastUpdateTime;
+                            this.layoutVersion = status.layoutVersion;
+                            this.bookieMode = status.bookieMode;
+                            success = true;
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                LOG.warn("IOException while trying to read bookie status from directory {}." +
+                    " This is fine if not all directories failed.", dir);
+            } catch (IllegalArgumentException e ){
+                LOG.warn("IllegalArgumentException while trying to read bookie status from directory {}." +
+                    " This is fine if not all directories failed.", dir);
+            }
+        }
+        if (success) {
+            LOG.info("Successfully retrieve bookie status {} from disks.", getBookieMode());
+        } else {
+            LOG.warn("Failed to retrieve bookie status from disks." +
+                    " Fall back to current or default bookie status: {}", getBookieMode());
+        }
+    }
+
+
+    /**
+     * Function to read the bookie status from a single file
+     *
+     * @param file file to read from
+     * @return BookieStatus if not error, null if file not exist or any exception happens
+     * @throws IOException
+     */
+    private BookieStatus readFromFile(File file)
+            throws IOException, IllegalArgumentException {
+        if (!file.exists()) {
+            return null;
+        }
+        
+        try (BufferedReader reader = new BufferedReader(
+            new InputStreamReader(new FileInputStream(file), UTF_8))) {
+            return parse(reader);
+        }
+    }
+
+    /**
+     * Parse the bookie status object using appropriate layout version
+     *
+     * @param reader
+     * @return BookieStatus if parse succeed, otherwise return null
+     * @throws IOException
+     */
+    public BookieStatus parse(BufferedReader reader)
+            throws IOException, IllegalArgumentException {
+        BookieStatus status = new BookieStatus();
+        String line = reader.readLine();
+        if (line == null || line.trim().isEmpty()) {
+            LOG.debug("Empty line when parsing bookie status");
+            return null;
+        }
+        String[] parts = line.split(",");
+        if (parts.length == 0) {
+            LOG.debug("Error in parsing bookie status: {}", line);
+            return null;
+        }
+        synchronized (status) {
+            status.layoutVersion = Integer.parseInt(parts[0].trim());
+            if (status.layoutVersion == 1 && parts.length == 3) {
+                status.bookieMode = BookieMode.valueOf(parts[1]);
+                status.lastUpdateTime = Long.parseLong(parts[2].trim());
+                return status;
+            }
+        }
+        return null;
+
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append(CURRENT_STATUS_LAYOUT_VERSION);
+        builder.append(",");
+        builder.append(getBookieMode());
+        builder.append(",");
+        builder.append(System.currentTimeMillis());
+        builder.append("\n");
+        return builder.toString();
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 7536120..3970a1d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -43,13 +43,13 @@ public class ReadOnlyBookie extends Bookie {
             throws IOException, KeeperException, InterruptedException, BookieException {
         super(conf, statsLogger);
         if (conf.isReadOnlyModeEnabled()) {
-            readOnly.set(true);
+            forceReadOnly.set(true);
         } else {
             String err = "Try to init ReadOnly Bookie, while ReadOnly mode is not enabled";
             LOG.error(err);
             throw new IOException(err);
         }
-        LOG.info("Running bookie in readonly mode.");
+        LOG.info("Running bookie in force readonly mode.");
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 413c487..ca001a4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -24,7 +24,9 @@ package org.apache.bookkeeper.bookie;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -61,6 +63,10 @@ class SyncThread {
     final LedgerDirsListener dirsListener;
     final CheckpointSource checkpointSource;
 
+    private final Object suspensionLock = new Object();
+    private boolean suspended = false;
+    private boolean disableCheckpoint = false;
+
     public SyncThread(ServerConfiguration conf,
                       LedgerDirsListener dirsListener,
                       LedgerStorage ledgerStorage,
@@ -79,25 +85,41 @@ class SyncThread {
 
     void start() {
         executor.scheduleAtFixedRate(new Runnable() {
-                public void run() {
-                    try {
-                        synchronized (suspensionLock) {
-                            while (suspended) {
-                                try {
-                                    suspensionLock.wait();
-                                } catch (InterruptedException e) {
-                                    Thread.currentThread().interrupt();
-                                    continue;
-                                }
+            public void run() {
+                try {
+                    synchronized (suspensionLock) {
+                        while (suspended) {
+                            try {
+                                suspensionLock.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                continue;
                             }
                         }
+                    }
+                    if (!disableCheckpoint) {
                         checkpoint(checkpointSource.newCheckpoint());
-                    } catch (Throwable t) {
-                        LOG.error("Exception in SyncThread", t);
-                        dirsListener.fatalError();
                     }
+                } catch (Throwable t) {
+                    LOG.error("Exception in SyncThread", t);
+                    dirsListener.fatalError();
+                }
+            }
+        }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+    }
+
+    public Future<Void> requestFlush() {
+        return executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                try {
+                    flush();
+                } catch (Throwable t) {
+                    LOG.error("Exception flushing ledgers ", t);
                 }
-            }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+                return null;
+            }
+        });
     }
 
     private void flush() {
@@ -113,6 +135,11 @@ class SyncThread {
             return;
         }
 
+        if (disableCheckpoint) {
+            return;
+        }
+
+        LOG.info("Flush ledger storage at checkpoint {}.", checkpoint);
         try {
             checkpointSource.checkpointComplete(checkpoint, false);
         } catch (IOException e) {
@@ -142,9 +169,6 @@ class SyncThread {
         }
     }
 
-    private Object suspensionLock = new Object();
-    private boolean suspended = false;
-
     /**
      * Suspend sync thread. (for testing)
      */
@@ -166,18 +190,15 @@ class SyncThread {
         }
     }
 
+    @VisibleForTesting
+    public void disableCheckpoint() {
+        disableCheckpoint = true;
+    }
+
     // shutdown sync thread
     void shutdown() throws InterruptedException {
         LOG.info("Shutting down SyncThread");
-        executor.submit(new Runnable() {
-                public void run() {
-                    try {
-                        flush();
-                    } catch (Throwable t) {
-                        LOG.error("Exception flushing ledgers at shutdown", t);
-                    }
-                }
-            });
+        requestFlush();
         executor.shutdown();
         long start = MathUtils.now();
         while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 787540e..cbf88ad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -110,6 +110,8 @@ public class ServerConfiguration extends AbstractConfiguration {
     protected final static String READ_ONLY_MODE_ENABLED = "readOnlyModeEnabled";
     //Whether the bookie is force started in ReadOnly mode
     protected final static String FORCE_READ_ONLY_BOOKIE = "forceReadOnlyBookie";
+    //Whether to persist the bookie status
+    protected final static String PERSIST_BOOKIE_STATUS_ENABLED = "persistBookieStatusEnabled";
     //Disk utilization
     protected final static String DISK_USAGE_THRESHOLD = "diskUsageThreshold";
     protected final static String DISK_USAGE_WARN_THRESHOLD = "diskUsageWarnThreshold";
@@ -1608,6 +1610,29 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Whether to persist the bookie status so that when bookie server restarts,
+     * it will continue using the previous status
+     *
+     * @param enabled
+     *            - true if persist the bookie status. Otherwise false.
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setPersistBookieStatusEnabled(boolean enabled) {
+        setProperty(PERSIST_BOOKIE_STATUS_ENABLED, enabled);
+        return this;
+    }
+
+    /**
+     * Get whether to persist the bookie status so that when bookie server restarts,
+     * it will continue using the previous status.
+     *
+     * @return true - if need to start a bookie in read only mode. Otherwise false.
+     */
+    public boolean isPersistBookieStatusEnabled() {
+        return getBoolean(PERSIST_BOOKIE_STATUS_ENABLED, false);
+    }
+
+    /**
      * Set the Disk free space threshold as a fraction of the total
      * after which disk will be considered as full during disk check.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 5e80f15..00c41bc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -32,9 +32,6 @@ import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.bookie.ReadOnlyBookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.BKHttpServiceProvider;
-import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.HttpServerLoader;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -63,9 +60,6 @@ public class BookieServer {
 
     int exitCode = ExitCode.OK;
 
-    // operation stats
-    HttpServer httpServer = null;
-
     // request processor
     private final RequestProcessor requestProcessor;
 
@@ -114,18 +108,6 @@ public class BookieServer {
             exitCode = bookie.getExitCode();
             return;
         }
-        if (conf.isHttpServerEnabled()) {
-            BKHttpServiceProvider serviceProvider = new BKHttpServiceProvider.Builder()
-                .setBookieServer(this)
-                .setServerConfiguration(conf)
-                .build();
-            HttpServerLoader.loadHttpServer(conf);
-            this.httpServer = HttpServerLoader.get();
-            if (this.httpServer != null) {
-                this.httpServer.initialize(serviceProvider);
-                this.httpServer.startServer(conf.getHttpServerPort());
-            }
-        }
         this.nettyServer.start();
 
         running = true;
@@ -173,9 +155,6 @@ public class BookieServer {
         }
         exitCode = bookie.shutdown();
         this.requestProcessor.close();
-        if (this.httpServer != null && this.httpServer.isRunning()) {
-            this.httpServer.stopServer();
-        }
         running = false;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index 99a93ae..9ba9d71 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -62,7 +62,7 @@ public class Main {
         BK_OPTS.addOption("c", "conf", true, "Configuration for Bookie Server");
         BK_OPTS.addOption("withAutoRecovery", false,
                 "Start Autorecovery service Bookie server");
-        BK_OPTS.addOption("readOnly", false,
+        BK_OPTS.addOption("r", "readOnly", false,
                 "Force Start a ReadOnly Bookie server");
         BK_OPTS.addOption("z", "zkserver", true, "Zookeeper Server");
         BK_OPTS.addOption("m", "zkledgerpath", true, "Zookeeper ledgers root path");
@@ -129,7 +129,7 @@ public class Main {
                 conf.setAutoRecoveryDaemonEnabled(true);
             }
 
-            if (cmdLine.hasOption("readOnly")) {
+            if (cmdLine.hasOption("r")) {
                 conf.setForceReadOnlyBookie(true);
             }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 362e1e6..3bbda1c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -28,6 +28,7 @@ public class BookKeeperConstants {
     public static final String LEDGER_NODE_PREFIX = "L";
     public static final String COLON = ":";
     public static final String VERSION_FILENAME = "VERSION";
+    public static final String BOOKIE_STATUS_FILENAME = "BOOKIE_STATUS";
     public final static String PASSWD = "passwd";
     public static final String CURRENT_DIR = "current";
     public static final String READONLY = "readonly";
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 3783c57..1d1edae 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -20,15 +20,23 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.BookKeeperConstants.BOOKIE_STATUS_FILENAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.net.BindException;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -319,10 +327,10 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
         bs1.getBookie().setRegistrationManager(rm);
         bs1.start();
-
+        BookieServer bs2 = null;
         // starting bk server with same conf
         try {
-            BookieServer bs2 = new BookieServer(conf);
+            bs2 = new BookieServer(conf);
             RegistrationManager newRm = new ZKRegistrationManager();
             newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
             bs2.getBookie().registrationManager = newRm;
@@ -333,6 +341,11 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         } catch (IOException e) {
             Assert.assertTrue("BKServer allowed duplicate Startups!",
                     e.getMessage().contains("bind"));
+        } finally {
+            bs1.shutdown();
+            if (bs2 != null) {
+                bs2.shutdown();
+            }
         }
     }
 
@@ -727,4 +740,174 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .build();
     }
+
+    /**
+     * Check bookie status should be able to persist on disk and retrieve when restart the bookie.
+     */
+    @Test(timeout = 10000)
+    public void testPersistBookieStatus() throws Exception {
+        // enable persistent bookie status
+        File tmpDir = createTempDir("bookie", "test");
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setJournalDirName(tmpDir.getPath())
+            .setLedgerDirNames(new String[] { tmpDir.getPath() })
+            .setReadOnlyModeEnabled(true)
+            .setPersistBookieStatusEnabled(true);
+        BookieServer bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        Bookie bookie = bookieServer.getBookie();
+        assertFalse(bookie.isReadOnly());
+        // transition to readonly mode, bookie status should be persisted in ledger disks
+        bookie.doTransitionToReadOnlyMode();
+        assertTrue(bookie.isReadOnly());
+
+        // restart bookie should start in read only mode
+        bookieServer.shutdown();
+        bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        bookie = bookieServer.getBookie();
+        assertTrue(bookie.isReadOnly());
+        // transition to writable mode
+        bookie.doTransitionToWritableMode();
+        // restart bookie should start in writable mode
+        bookieServer.shutdown();
+        bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        bookie = bookieServer.getBookie();
+        assertFalse(bookie.isReadOnly());
+        bookieServer.shutdown();
+    }
+
+    /**
+     * Check when we start a ReadOnlyBookie, we should ignore bookie status
+     */
+    @Test(timeout = 10000)
+    public void testReadOnlyBookieShouldIgnoreBookieStatus() throws Exception {
+        File tmpDir = createTempDir("bookie", "test");
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setJournalDirName(tmpDir.getPath())
+            .setLedgerDirNames(new String[] { tmpDir.getPath() })
+            .setReadOnlyModeEnabled(true)
+            .setPersistBookieStatusEnabled(true);
+        // start new bookie
+        BookieServer bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        Bookie bookie = bookieServer.getBookie();
+        // persist bookie status
+        bookie.doTransitionToReadOnlyMode();
+        bookie.doTransitionToWritableMode();
+        assertFalse(bookie.isReadOnly());
+        bookieServer.shutdown();
+        // start read only bookie
+        final ServerConfiguration readOnlyConf = TestBKConfiguration.newServerConfiguration();
+        readOnlyConf.loadConf(conf);
+        readOnlyConf.setForceReadOnlyBookie(true);
+        bookieServer = new BookieServer(readOnlyConf);
+        bookieServer.start();
+        bookie = bookieServer.getBookie();
+        assertTrue(bookie.isReadOnly());
+        // transition to writable should fail
+        bookie.doTransitionToWritableMode();
+        assertTrue(bookie.isReadOnly());
+        bookieServer.shutdown();
+    }
+
+    /**
+     * Check that if there's multiple bookie status copies, as long as not all of them are corrupted,
+     * the bookie status should be retrievable.
+     */
+    @Test(timeout = 10000)
+    public void testRetrieveBookieStatusWhenStatusFileIsCorrupted() throws Exception {
+        File[] tmpLedgerDirs = new File[3];
+        String[] filePath = new String[tmpLedgerDirs.length];
+        for (int i = 0; i < tmpLedgerDirs.length; i++) {
+            tmpLedgerDirs[i] = createTempDir("bookie", "test" + i);
+            filePath[i] = tmpLedgerDirs[i].getPath();
+        }
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setJournalDirName(filePath[0])
+            .setLedgerDirNames(filePath)
+            .setReadOnlyModeEnabled(true)
+            .setPersistBookieStatusEnabled(true);
+        // start a new bookie
+        BookieServer bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        // transition in to read only and persist the status on disk
+        Bookie bookie = bookieServer.getBookie();
+        assertFalse(bookie.isReadOnly());
+        bookie.doTransitionToReadOnlyMode();
+        assertTrue(bookie.isReadOnly());
+        // corrupt status file
+        List<File> ledgerDirs = bookie.getLedgerDirsManager().getAllLedgerDirs();
+        corruptFile(new File(ledgerDirs.get(0), BOOKIE_STATUS_FILENAME));
+        corruptFile(new File(ledgerDirs.get(1), BOOKIE_STATUS_FILENAME));
+        // restart the bookie should be in read only mode
+        bookieServer.shutdown();
+        bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        bookie = bookieServer.getBookie();
+        assertTrue(bookie.isReadOnly());
+        bookieServer.shutdown();
+    }
+
+    /**
+     * Check if the bookie would read the latest status if the status files are not consistent.
+     * @throws Exception
+     */
+    @Test(timeout = 10000)
+    public void testReadLatestBookieStatus() throws Exception {
+        File[] tmpLedgerDirs = new File[3];
+        String[] filePath = new String[tmpLedgerDirs.length];
+        for (int i = 0; i < tmpLedgerDirs.length; i++) {
+            tmpLedgerDirs[i] = createTempDir("bookie", "test" + i);
+            filePath[i] = tmpLedgerDirs[i].getPath();
+        }
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setJournalDirName(filePath[0])
+            .setLedgerDirNames(filePath)
+            .setReadOnlyModeEnabled(true)
+            .setPersistBookieStatusEnabled(true);
+        // start a new bookie
+        BookieServer bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        // transition in to read only and persist the status on disk
+        Bookie bookie = bookieServer.getBookie();
+        assertFalse(bookie.isReadOnly());
+        bookie.doTransitionToReadOnlyMode();
+        assertTrue(bookie.isReadOnly());
+        // Manually update a status file, so it becomes the latest
+        Thread.sleep(1);
+        BookieStatus status = new BookieStatus();
+        List<File> dirs = new ArrayList<File>();
+        dirs.add(bookie.getLedgerDirsManager().getAllLedgerDirs().get(0));
+        status.writeToDirectories(dirs);
+        // restart the bookie should start in writable state
+        bookieServer.shutdown();
+        bookieServer = new BookieServer(conf);
+        bookieServer.start();
+        bookie = bookieServer.getBookie();
+        assertFalse(bookie.isReadOnly());
+        bookieServer.shutdown();
+    }
+
+    private void corruptFile(File file) throws IOException {
+        FileOutputStream fos = new FileOutputStream(file);
+        BufferedWriter bw = null;
+        try {
+            bw = new BufferedWriter(new OutputStreamWriter(fos, UTF_8));
+            byte[] bytes = new byte[64];
+            new Random().nextBytes(bytes);
+            bw.write(new String(bytes));
+        } finally {
+            if (bw != null) {
+                bw.close();
+            }
+            fos.close();
+        }
+    }
+
 }
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index af74b5a..54a26c4 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -342,7 +342,8 @@ groups:
   - param: forceReadOnlyBookie
     description: Whether the bookie is force started in read only mode or not.
     default: 'false'
-
+  - param: persistBookieStatusEnabled
+    description: Persist the bookie status locally on the disks. So the bookies can keep their status upon restarts.
 
 - name: Disk utilization
   params:

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].